1use std::{
2 collections::{BTreeMap, HashMap},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6 time::Duration,
7};
8
9use facet_core::Shape;
10use moire::sync::mpsc;
11use tokio::sync::{mpsc as tokio_mpsc, oneshot as tokio_oneshot, watch};
12use tracing::{trace, warn};
13use vox_types::{
14 BoxFut, ChannelMessage, ConduitRx, ConduitTx, ConnectionAccept, ConnectionClose, ConnectionId,
15 ConnectionOpen, ConnectionReject, ConnectionSettings, Handler, HandshakeResult, IdAllocator,
16 MaybeSend, MaybeSync, Message, MessageFamily, MessagePayload, Metadata, Parity, RequestBody,
17 RequestId, RequestMessage, RequestResponse, SchemaMessage, SelfRef, SessionResumeKey,
18 SessionRole, TrySendError, VoxDebugSnapshot, VoxObserverHandle,
19};
20use vox_types::{
21 ConnectionCloseReason, ConnectionDebugSnapshot, ConnectionDebugState, DecodeErrorKind,
22 DriverTaskStatus,
23};
24
25mod builders;
26pub use builders::*;
27
28#[derive(Debug, Clone, Copy)]
30pub struct SessionKeepaliveConfig {
31 pub ping_interval: Duration,
32 pub pong_timeout: Duration,
33}
34
35pub struct ConnectionRequest<'a> {
43 metadata: &'a [vox_types::MetadataEntry<'a>],
44 service: &'a str,
45}
46
47impl<'a> ConnectionRequest<'a> {
48 pub fn new(metadata: &'a [vox_types::MetadataEntry<'a>]) -> Result<Self, SessionError> {
52 let service = vox_types::metadata_get_str(metadata, "vox-service").ok_or_else(|| {
53 SessionError::Protocol("missing required vox-service metadata".into())
54 })?;
55 Ok(Self { metadata, service })
56 }
57
58 pub fn service(&self) -> &str {
60 self.service
61 }
62
63 pub fn transport(&self) -> Option<&str> {
65 vox_types::metadata_get_str(self.metadata, "vox-transport")
66 }
67
68 pub fn peer_addr(&self) -> Option<&str> {
70 vox_types::metadata_get_str(self.metadata, "vox-peer-addr")
71 }
72
73 pub fn is_root(&self) -> bool {
75 !self.is_virtual()
76 }
77
78 pub fn is_virtual(&self) -> bool {
80 vox_types::metadata_get_str(self.metadata, "vox-connection-kind") == Some("virtual")
81 }
82
83 pub fn get_str(&self, key: &str) -> Option<&str> {
85 vox_types::metadata_get_str(self.metadata, key)
86 }
87
88 pub fn get_u64(&self, key: &str) -> Option<u64> {
90 vox_types::metadata_get_u64(self.metadata, key)
91 }
92
93 pub fn metadata(&self) -> &[vox_types::MetadataEntry<'a>] {
95 self.metadata
96 }
97}
98
99pub struct PendingConnection {
106 handle: Option<ConnectionHandle>,
107 caller_slot: Option<Arc<std::sync::Mutex<Option<crate::Caller>>>>,
108 operation_store: Option<Arc<dyn crate::OperationStore>>,
109}
110
111impl PendingConnection {
112 fn new(handle: ConnectionHandle) -> Self {
113 Self {
114 handle: Some(handle),
115 caller_slot: None,
116 operation_store: None,
117 }
118 }
119
120 fn with_caller_slot(
122 handle: ConnectionHandle,
123 caller_slot: Arc<std::sync::Mutex<Option<crate::Caller>>>,
124 operation_store: Option<Arc<dyn crate::OperationStore>>,
125 ) -> Self {
126 Self {
127 handle: Some(handle),
128 caller_slot: Some(caller_slot),
129 operation_store,
130 }
131 }
132
133 pub fn handle_with(mut self, handler: impl Handler<crate::DriverReplySink> + 'static) {
135 let handle = self
136 .handle
137 .take()
138 .expect("PendingConnection already consumed");
139 let conn_id = handle.connection_id();
140 trace!(%conn_id, "PendingConnection::handle_with: creating driver");
141 let mut driver = match self.operation_store.take() {
142 Some(store) => crate::Driver::with_operation_store(handle, handler, store),
143 None => crate::Driver::new(handle, handler),
144 };
145 if let Some(slot) = &self.caller_slot {
146 let caller = crate::Caller::new(driver.caller());
147 *slot.lock().unwrap() = Some(caller);
148 }
149 #[cfg(not(target_arch = "wasm32"))]
150 tokio::spawn(async move {
151 trace!(%conn_id, "PendingConnection driver starting");
152 driver.run().await;
153 trace!(%conn_id, "PendingConnection driver exited");
154 });
155 #[cfg(target_arch = "wasm32")]
156 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
157 }
158
159 pub fn handle_with_client<C: crate::FromVoxSession>(
161 mut self,
162 handler: impl Handler<crate::DriverReplySink> + 'static,
163 ) -> C {
164 let handle = self
165 .handle
166 .take()
167 .expect("PendingConnection already consumed");
168 let conn_id = handle.connection_id();
169 trace!(%conn_id, "PendingConnection::handle_with_client: creating driver");
170 let mut driver = match self.operation_store.take() {
171 Some(store) => crate::Driver::with_operation_store(handle, handler, store),
172 None => crate::Driver::new(handle, handler),
173 };
174 let caller = crate::Caller::new(driver.caller());
175 if let Some(slot) = &self.caller_slot {
176 *slot.lock().unwrap() = Some(caller.clone());
177 }
178 #[cfg(not(target_arch = "wasm32"))]
179 tokio::spawn(async move {
180 trace!(%conn_id, "PendingConnection driver starting");
181 driver.run().await;
182 trace!(%conn_id, "PendingConnection driver exited");
183 });
184 #[cfg(target_arch = "wasm32")]
185 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
186 C::from_vox_session(caller, None)
187 }
188
189 pub fn proxy_to(mut self, other: ConnectionHandle) {
191 let handle = self
192 .handle
193 .take()
194 .expect("PendingConnection already consumed");
195 #[cfg(not(target_arch = "wasm32"))]
196 tokio::spawn(async move {
197 let _ = proxy_connections(handle, other).await;
198 });
199 #[cfg(target_arch = "wasm32")]
200 wasm_bindgen_futures::spawn_local(async move {
201 let _ = proxy_connections(handle, other).await;
202 });
203 }
204
205 pub fn into_handle(mut self) -> ConnectionHandle {
207 self.handle
208 .take()
209 .expect("PendingConnection already consumed")
210 }
211}
212
213impl Drop for PendingConnection {
214 fn drop(&mut self) {
215 if let Some(handle) = self.handle.take() {
216 let conn_id = handle.connection_id();
217 warn!(%conn_id, "PendingConnection dropped without being consumed — closing connection");
218 if let Some(tx) = handle.control_tx.as_ref() {
219 let _ = send_drop_control(tx, DropControlRequest::Close(conn_id));
220 }
221 }
222 }
223}
224
225pub trait ConnectionAcceptor: MaybeSend + MaybeSync + 'static {
227 fn accept(
228 &self,
229 request: &ConnectionRequest,
230 connection: PendingConnection,
231 ) -> Result<(), Metadata<'static>>;
232}
233
234impl<H> ConnectionAcceptor for H
236where
237 H: Handler<crate::DriverReplySink> + Clone + MaybeSend + MaybeSync + 'static,
238{
239 fn accept(
240 &self,
241 _request: &ConnectionRequest,
242 connection: PendingConnection,
243 ) -> Result<(), Metadata<'static>> {
244 connection.handle_with(self.clone());
245 Ok(())
246 }
247}
248
249pub struct AcceptorFn<F>(pub F);
251
252impl<F> ConnectionAcceptor for AcceptorFn<F>
253where
254 F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
255 + MaybeSend
256 + MaybeSync
257 + 'static,
258{
259 fn accept(
260 &self,
261 request: &ConnectionRequest,
262 connection: PendingConnection,
263 ) -> Result<(), Metadata<'static>> {
264 (self.0)(request, connection)
265 }
266}
267
268pub fn acceptor_fn<F>(f: F) -> AcceptorFn<F>
270where
271 F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
272 + MaybeSend
273 + MaybeSync
274 + 'static,
275{
276 AcceptorFn(f)
277}
278
279struct OpenRequest {
284 settings: ConnectionSettings,
285 metadata: Metadata<'static>,
286 result_tx: moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>,
287}
288
289struct CloseRequest {
290 conn_id: ConnectionId,
291 metadata: Metadata<'static>,
292 result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
293}
294
295struct ResumeRequest {
296 tx: Arc<dyn DynConduitTx>,
297 rx: Box<dyn DynConduitRx>,
298 handshake_result: HandshakeResult,
299 result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
300}
301
302#[derive(Debug, Clone, Copy)]
303pub(crate) enum DropControlRequest {
304 Shutdown,
305 Close(ConnectionId),
306}
307
308#[derive(Clone, Copy, Debug)]
309pub(crate) enum FailureDisposition {
310 Cancelled,
311 Indeterminate,
312}
313
314#[cfg(not(target_arch = "wasm32"))]
315fn send_drop_control(
316 tx: &mpsc::UnboundedSender<DropControlRequest>,
317 req: DropControlRequest,
318) -> Result<(), ()> {
319 tx.send(req).map_err(|_| ())
320}
321
322#[cfg(target_arch = "wasm32")]
323fn send_drop_control(
324 tx: &mpsc::UnboundedSender<DropControlRequest>,
325 req: DropControlRequest,
326) -> Result<(), ()> {
327 tx.try_send(req).map_err(|_| ())
328}
329
330#[derive(Clone)]
341pub struct SessionHandle {
342 open_tx: mpsc::Sender<OpenRequest>,
343 close_tx: mpsc::Sender<CloseRequest>,
344 resume_tx: mpsc::Sender<ResumeRequest>,
345 control_tx: mpsc::UnboundedSender<DropControlRequest>,
346 resume_key: Option<SessionResumeKey>,
347}
348
349impl SessionHandle {
350 pub async fn open<Client: crate::FromVoxSession>(
356 &self,
357 settings: ConnectionSettings,
358 ) -> Result<Client, SessionError> {
359 use crate::{Caller, Driver};
360 use vox_types::{MetadataEntry, MetadataFlags, MetadataValue};
361
362 let metadata: Metadata<'static> = vec![MetadataEntry {
363 key: crate::session::builders::VOX_SERVICE_METADATA_KEY.into(),
364 value: MetadataValue::String(Client::SERVICE_NAME.into()),
365 flags: MetadataFlags::NONE,
366 }];
367 let handle = self.open_connection(settings, metadata).await?;
368 let mut driver = Driver::new(handle, ());
369 let caller = Caller::new(driver.caller());
370 #[cfg(not(target_arch = "wasm32"))]
371 tokio::spawn(async move { driver.run().await });
372 #[cfg(target_arch = "wasm32")]
373 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
374 Ok(Client::from_vox_session(caller, None))
375 }
376
377 pub async fn open_connection(
384 &self,
385 settings: ConnectionSettings,
386 metadata: Metadata<'static>,
387 ) -> Result<ConnectionHandle, SessionError> {
388 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.open_result");
389 self.open_tx
390 .send(OpenRequest {
391 settings,
392 metadata,
393 result_tx,
394 })
395 .await
396 .map_err(|_| SessionError::Protocol("session closed".into()))?;
397 result_rx
398 .await
399 .map_err(|_| SessionError::Protocol("session closed".into()))?
400 }
401
402 pub async fn close_connection(
409 &self,
410 conn_id: ConnectionId,
411 metadata: Metadata<'static>,
412 ) -> Result<(), SessionError> {
413 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.close_result");
414 self.close_tx
415 .send(CloseRequest {
416 conn_id,
417 metadata,
418 result_tx,
419 })
420 .await
421 .map_err(|_| SessionError::Protocol("session closed".into()))?;
422 result_rx
423 .await
424 .map_err(|_| SessionError::Protocol("session closed".into()))?
425 }
426
427 pub(crate) async fn resume_parts(
428 &self,
429 tx: Arc<dyn DynConduitTx>,
430 rx: Box<dyn DynConduitRx>,
431 handshake_result: HandshakeResult,
432 ) -> Result<(), SessionError> {
433 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.resume_result");
434 self.resume_tx
435 .send(ResumeRequest {
436 tx,
437 rx,
438 handshake_result,
439 result_tx,
440 })
441 .await
442 .map_err(|_| SessionError::Protocol("session closed".into()))?;
443 result_rx
444 .await
445 .map_err(|_| SessionError::Protocol("session closed".into()))?
446 }
447
448 pub fn resume_key(&self) -> Option<&SessionResumeKey> {
450 self.resume_key.as_ref()
451 }
452
453 pub fn shutdown(&self) -> Result<(), SessionError> {
455 send_drop_control(&self.control_tx, DropControlRequest::Shutdown)
456 .map_err(|_| SessionError::Protocol("session closed".into()))
457 }
458}
459
460pub struct Session {
468 rx: Box<dyn DynConduitRx>,
470
471 role: SessionRole,
473
474 parity: Parity,
477
478 sess_core: Arc<SessionCore>,
480 peer_supports_retry: bool,
481 local_root_settings: ConnectionSettings,
482 peer_root_settings: Option<ConnectionSettings>,
483 resumable: bool,
484 session_resume_key: Option<SessionResumeKey>,
485
486 conns: BTreeMap<ConnectionId, ConnectionSlot>,
488 root_closed_internal: bool,
490
491 conn_ids: IdAllocator<ConnectionId>,
493
494 on_connection: Option<Arc<dyn ConnectionAcceptor>>,
496
497 open_rx: mpsc::Receiver<OpenRequest>,
499
500 close_rx: mpsc::Receiver<CloseRequest>,
502
503 resume_rx: mpsc::Receiver<ResumeRequest>,
505
506 control_tx: mpsc::UnboundedSender<DropControlRequest>,
508 control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
509
510 keepalive: Option<SessionKeepaliveConfig>,
512 resume_notifier: watch::Sender<u64>,
513 recoverer: Option<Box<dyn ConduitRecoverer>>,
514 recovery_timeout: Option<Duration>,
515 registered_in_registry: bool,
518
519 observer: Option<VoxObserverHandle>,
520}
521
522#[derive(Debug)]
523struct KeepaliveRuntime {
524 ping_interval: Duration,
525 pong_timeout: Duration,
526 next_ping_at: vox_types::time::tokio::Instant,
527 waiting_pong_nonce: Option<u64>,
528 pong_deadline: vox_types::time::tokio::Instant,
529 next_ping_nonce: u64,
530}
531
532#[derive(Debug)]
535pub struct ConnectionState {
536 pub id: ConnectionId,
538
539 pub local_settings: ConnectionSettings,
541
542 pub peer_settings: ConnectionSettings,
544
545 conn_tx: mpsc::Sender<RecvMessage>,
547 closed_tx: watch::Sender<Option<ConnectionCloseReason>>,
548
549 schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
551}
552
553#[derive(Debug)]
554enum ConnectionSlot {
555 Active(ConnectionState),
556 PendingOutbound(PendingOutboundData),
557}
558
559struct PendingOutboundData {
561 local_settings: ConnectionSettings,
562 result_tx: Option<moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>>,
563}
564
565impl std::fmt::Debug for PendingOutboundData {
566 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
567 f.debug_struct("PendingOutbound")
568 .field("local_settings", &self.local_settings)
569 .finish()
570 }
571}
572
573#[derive(Clone)]
574pub(crate) struct ConnectionSender {
575 connection_id: ConnectionId,
576 pub(crate) sess_core: Arc<SessionCore>,
577 failures: Arc<mpsc::UnboundedSender<(RequestId, FailureDisposition)>>,
578}
579
580fn forwarded_payload<'a>(payload: &'a vox_types::Payload<'a>) -> vox_types::Payload<'a> {
581 let vox_types::Payload::PostcardBytes(bytes) = payload else {
582 unreachable!("proxy forwarding expects decoded incoming payload bytes")
583 };
584 vox_types::Payload::PostcardBytes(bytes)
585}
586
587fn forwarded_request_body<'a>(body: &'a RequestBody<'a>) -> RequestBody<'a> {
588 match body {
589 RequestBody::Call(call) => RequestBody::Call(vox_types::RequestCall {
590 method_id: call.method_id,
591 metadata: call.metadata.clone(),
592 args: forwarded_payload(&call.args),
593 schemas: call.schemas.clone(),
594 }),
595 RequestBody::Response(response) => RequestBody::Response(RequestResponse {
596 metadata: response.metadata.clone(),
597 ret: forwarded_payload(&response.ret),
598 schemas: response.schemas.clone(),
599 }),
600 RequestBody::Cancel(cancel) => RequestBody::Cancel(vox_types::RequestCancel {
601 metadata: cancel.metadata.clone(),
602 }),
603 }
604}
605
606fn forwarded_channel_body<'a>(body: &'a vox_types::ChannelBody<'a>) -> vox_types::ChannelBody<'a> {
607 match body {
608 vox_types::ChannelBody::Item(item) => {
609 vox_types::ChannelBody::Item(vox_types::ChannelItem {
610 item: forwarded_payload(&item.item),
611 })
612 }
613 vox_types::ChannelBody::Close(close) => {
614 vox_types::ChannelBody::Close(vox_types::ChannelClose {
615 metadata: close.metadata.clone(),
616 })
617 }
618 vox_types::ChannelBody::Reset(reset) => {
619 vox_types::ChannelBody::Reset(vox_types::ChannelReset {
620 metadata: reset.metadata.clone(),
621 })
622 }
623 vox_types::ChannelBody::GrantCredit(credit) => {
624 vox_types::ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
625 additional: credit.additional,
626 })
627 }
628 }
629}
630
631impl ConnectionSender {
632 pub(crate) fn connection_id(&self) -> ConnectionId {
633 self.connection_id
634 }
635
636 pub(crate) async fn send_with_binder<'a>(
637 &self,
638 msg: ConnectionMessage<'a>,
639 binder: Option<&'a dyn vox_types::ChannelBinder>,
640 ) -> Result<(), ()> {
641 let payload = match msg {
642 ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
643 ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
644 };
645 let message = Message {
646 connection_id: self.connection_id,
647 payload,
648 };
649 self.sess_core
650 .send(message, binder, None)
651 .await
652 .map_err(|_| ())
653 }
654
655 pub async fn send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), ()> {
657 self.send_with_binder(msg, None).await
658 }
659
660 pub(crate) fn try_send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), TrySendError<()>> {
662 let payload = match msg {
663 ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
664 ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
665 };
666 self.sess_core.try_send(
667 Message {
668 connection_id: self.connection_id,
669 payload,
670 },
671 None,
672 None,
673 )
674 }
675
676 pub(crate) async fn send_owned(
678 &self,
679 schemas: Arc<vox_types::SchemaRecvTracker>,
680 msg: SelfRef<ConnectionMessage<'static>>,
681 ) -> Result<(), ()> {
682 let msg_ref = msg.get();
683 let payload = match msg_ref {
684 ConnectionMessage::Request(request) => MessagePayload::RequestMessage(RequestMessage {
685 id: request.id,
686 body: forwarded_request_body(&request.body),
687 }),
688 ConnectionMessage::Channel(channel) => MessagePayload::ChannelMessage(ChannelMessage {
689 id: channel.id,
690 body: forwarded_channel_body(&channel.body),
691 }),
692 };
693
694 self.sess_core
695 .send(
696 Message {
697 connection_id: self.connection_id,
698 payload,
699 },
700 None,
701 Some(&*schemas),
702 )
703 .await
704 .map_err(|_| ())
705 }
706
707 pub async fn send_response<'a>(
709 &self,
710 request_id: RequestId,
711 response: RequestResponse<'a>,
712 ) -> Result<(), ()> {
713 self.send(ConnectionMessage::Request(RequestMessage {
714 id: request_id,
715 body: RequestBody::Response(response),
716 }))
717 .await
718 }
719
720 pub async fn send_response_for_method<'a>(
722 &self,
723 request_id: RequestId,
724 method_id: vox_types::MethodId,
725 mut response: RequestResponse<'a>,
726 ) -> Result<(), ()> {
727 self.prepare_response_for_method(request_id, method_id, &mut response);
728 self.send(ConnectionMessage::Request(RequestMessage {
729 id: request_id,
730 body: RequestBody::Response(response),
731 }))
732 .await
733 }
734
735 pub(crate) fn prepare_response_for_method(
737 &self,
738 request_id: RequestId,
739 method_id: vox_types::MethodId,
740 response: &mut RequestResponse<'_>,
741 ) {
742 self.sess_core.prepare_response_for_method(
743 self.connection_id,
744 request_id,
745 method_id,
746 response,
747 );
748 }
749
750 pub(crate) fn prepare_response_from_source(
752 &self,
753 request_id: RequestId,
754 method_id: vox_types::MethodId,
755 root_type: &vox_types::TypeRef,
756 source: &dyn vox_types::SchemaSource,
757 response: &mut RequestResponse<'_>,
758 ) {
759 self.sess_core.prepare_response_from_source(
760 self.connection_id,
761 request_id,
762 method_id,
763 root_type,
764 source,
765 response,
766 );
767 }
768
769 pub fn mark_failure(&self, request_id: RequestId, disposition: FailureDisposition) {
772 let _ = self.failures.send((request_id, disposition));
773 }
774
775 pub fn prepare_replay_schemas(
779 &self,
780 request_id: RequestId,
781 method_id: vox_types::MethodId,
782 response_shape: &'static Shape,
783 response: &mut RequestResponse<'_>,
784 ) {
785 self.sess_core.prepare_response_from_shape(
786 self.connection_id,
787 request_id,
788 method_id,
789 response_shape,
790 response,
791 );
792 }
793}
794
795pub struct ConnectionHandle {
796 pub(crate) sender: ConnectionSender,
797 pub(crate) rx: mpsc::Receiver<RecvMessage>,
798 pub(crate) failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
799 pub(crate) control_tx: Option<mpsc::UnboundedSender<DropControlRequest>>,
800 pub(crate) closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
801 pub(crate) resumed_rx: watch::Receiver<u64>,
802 pub(crate) local_settings: ConnectionSettings,
803 pub(crate) peer_settings: ConnectionSettings,
804 pub parity: Parity,
806 pub(crate) peer_supports_retry: bool,
807 pub(crate) observer: Option<VoxObserverHandle>,
808}
809
810impl std::fmt::Debug for ConnectionHandle {
811 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812 f.debug_struct("ConnectionHandle")
813 .field("connection_id", &self.sender.connection_id)
814 .finish()
815 }
816}
817
818pub(crate) enum ConnectionMessage<'payload> {
819 Request(RequestMessage<'payload>),
820 Channel(ChannelMessage<'payload>),
821}
822
823vox_types::impl_reborrow!(ConnectionMessage);
824
825pub(crate) struct RecvMessage {
829 pub schemas: Arc<vox_types::SchemaRecvTracker>,
830 pub msg: SelfRef<ConnectionMessage<'static>>,
831}
832
833impl ConnectionHandle {
834 pub fn connection_id(&self) -> ConnectionId {
836 self.sender.connection_id
837 }
838
839 pub async fn closed(&self) {
841 if self.closed_rx.borrow().is_some() {
842 return;
843 }
844 let mut rx = self.closed_rx.clone();
845 while rx.changed().await.is_ok() {
846 if rx.borrow().is_some() {
847 return;
848 }
849 }
850 }
851
852 pub fn is_connected(&self) -> bool {
854 self.closed_rx.borrow().is_none()
855 }
856
857 pub fn close_reason(&self) -> Option<ConnectionCloseReason> {
858 *self.closed_rx.borrow()
859 }
860
861 pub fn peer_supports_retry(&self) -> bool {
862 self.peer_supports_retry
863 }
864
865 pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
867 let (outbound_queue_depth, outbound_queue_capacity) =
868 self.sender.sess_core.outbound_queue_stats();
869 VoxDebugSnapshot {
870 connections: vec![ConnectionDebugSnapshot {
871 connection_id: self.connection_id(),
872 endpoint: None,
873 surface: None,
874 component: None,
875 state: if self.closed_rx.borrow().is_some() {
876 ConnectionDebugState::Closed
877 } else {
878 ConnectionDebugState::Open
879 },
880 outstanding_requests: 0,
881 requests: Vec::new(),
882 open_channels: Vec::new(),
883 outbound_queue_depth: Some(outbound_queue_depth),
884 outbound_queue_capacity: Some(outbound_queue_capacity),
885 local_control_queue_depth: None,
886 local_control_queue_capacity: None,
887 last_inbound_message_at: None,
888 last_outbound_message_at: None,
889 last_progress_at: None,
890 close_reason: *self.closed_rx.borrow(),
891 driver_task_status: DriverTaskStatus::Unknown,
892 }],
893 }
894 }
895
896 pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
897 let snapshot = self.debug_snapshot();
898 tracing::info!(?snapshot, "vox debug snapshot");
899 snapshot
900 }
901}
902
903pub async fn proxy_connections(
909 left: ConnectionHandle,
910 right: ConnectionHandle,
911) -> Result<(), SessionError> {
912 if left.parity == right.parity {
913 return Err(SessionError::Protocol(
914 "proxy_connections requires opposite parities".into(),
915 ));
916 }
917 let left_conn_id = left.connection_id();
918 let right_conn_id = right.connection_id();
919 let ConnectionHandle {
920 sender: left_sender,
921 rx: mut left_rx,
922 failures_rx: _left_failures_rx,
923 control_tx: left_control_tx,
924 closed_rx: _left_closed_rx,
925 resumed_rx: _left_resumed_rx,
926 local_settings: _left_local_settings,
927 peer_settings: _left_peer_settings,
928 parity: _left_parity,
929 peer_supports_retry: _left_peer_supports_retry,
930 observer: _left_observer,
931 } = left;
932 let ConnectionHandle {
933 sender: right_sender,
934 rx: mut right_rx,
935 failures_rx: _right_failures_rx,
936 control_tx: right_control_tx,
937 closed_rx: _right_closed_rx,
938 resumed_rx: _right_resumed_rx,
939 local_settings: _right_local_settings,
940 peer_settings: _right_peer_settings,
941 parity: _right_parity,
942 peer_supports_retry: _right_peer_supports_retry,
943 observer: _right_observer,
944 } = right;
945
946 loop {
947 tokio::select! {
948 recv = left_rx.recv() => {
949 let Some(recv) = recv else {
950 break;
951 };
952 if right_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
953 break;
954 }
955 }
956 recv = right_rx.recv() => {
957 let Some(recv) = recv else {
958 break;
959 };
960 if left_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
961 break;
962 }
963 }
964 }
965 }
966
967 if let Some(tx) = left_control_tx.as_ref() {
968 let _ = send_drop_control(tx, DropControlRequest::Close(left_conn_id));
969 }
970 if let Some(tx) = right_control_tx.as_ref() {
971 let _ = send_drop_control(tx, DropControlRequest::Close(right_conn_id));
972 }
973 Ok(())
974}
975
976#[derive(Debug)]
978pub enum SessionError {
979 Io(std::io::Error),
980 Protocol(String),
981 Rejected(Metadata<'static>),
982 NotResumable,
983 ConnectTimeout,
984}
985
986impl SessionError {
987 pub fn is_retryable(&self) -> bool {
993 matches!(
994 self,
995 Self::Io(_) | Self::ConnectTimeout | Self::NotResumable
996 )
997 }
998}
999
1000impl std::fmt::Display for SessionError {
1001 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1002 match self {
1003 Self::Io(e) => write!(f, "io error: {e}"),
1004 Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
1005 Self::Rejected(_) => write!(f, "connection rejected"),
1006 Self::NotResumable => write!(f, "session is not resumable"),
1007 Self::ConnectTimeout => write!(f, "connect timeout"),
1008 }
1009 }
1010}
1011
1012impl std::error::Error for SessionError {}
1013
1014fn classify_session_recv_error(error: &std::io::Error) -> ConnectionCloseReason {
1015 let message = error.to_string();
1016 if message.contains("decode error") || message.contains("protocol") {
1017 ConnectionCloseReason::Protocol
1018 } else {
1019 ConnectionCloseReason::Transport
1020 }
1021}
1022
1023fn classify_decode_error(error: &std::io::Error) -> Option<DecodeErrorKind> {
1024 let message = error.to_string();
1025 if message.contains("decode error") {
1026 Some(DecodeErrorKind::Payload)
1027 } else {
1028 None
1029 }
1030}
1031
1032impl Session {
1033 fn observe_session_recv_error(&self, error: &std::io::Error) {
1036 let Some(observer) = &self.observer else {
1037 return;
1038 };
1039
1040 if let Some(kind) = classify_decode_error(error) {
1041 for conn_id in self.conns.iter().filter_map(|(conn_id, slot)| {
1042 matches!(slot, ConnectionSlot::Active(_)).then_some(*conn_id)
1043 }) {
1044 observer.driver_event(vox_types::DriverEvent::DecodeError {
1045 connection_id: conn_id,
1046 kind,
1047 });
1048 }
1049 return;
1050 }
1051
1052 observer.transport_event(vox_types::TransportEvent::Closed {
1053 connection_id: None,
1054 reason: classify_session_recv_error(error),
1055 });
1056 }
1057
1058 fn close_connection_for_protocol_error(
1059 &mut self,
1060 conn_id: ConnectionId,
1061 detail: impl std::fmt::Display,
1062 ) {
1063 warn!(%conn_id, "closing connection after protocol error: {detail}");
1064 self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Protocol);
1065 self.maybe_request_shutdown_after_root_closed();
1066 }
1067
1068 fn record_received_schema_cbor(
1069 &mut self,
1070 conn_id: ConnectionId,
1071 schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
1072 method_id: vox_types::MethodId,
1073 direction: vox_types::BindingDirection,
1074 schemas_cbor: &vox_types::CborPayload,
1075 context: &str,
1076 ) -> bool {
1077 let payload = match vox_types::SchemaPayload::from_cbor(&schemas_cbor.0) {
1078 Ok(payload) => payload,
1079 Err(error) => {
1080 self.close_connection_for_protocol_error(
1081 conn_id,
1082 format!("{context}: invalid schema CBOR: {error}"),
1083 );
1084 return false;
1085 }
1086 };
1087
1088 if let Err(error) = schema_recv_tracker.record_received(method_id, direction, payload) {
1089 self.close_connection_for_protocol_error(conn_id, format!("{context}: {error}"));
1090 return false;
1091 }
1092
1093 true
1094 }
1095
1096 #[allow(clippy::too_many_arguments)]
1097 fn pre_handshake<Tx, Rx>(
1098 tx: Tx,
1099 rx: Rx,
1100 on_connection: Option<Arc<dyn ConnectionAcceptor>>,
1101 open_rx: mpsc::Receiver<OpenRequest>,
1102 close_rx: mpsc::Receiver<CloseRequest>,
1103 resume_rx: mpsc::Receiver<ResumeRequest>,
1104 control_tx: mpsc::UnboundedSender<DropControlRequest>,
1105 control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
1106 keepalive: Option<SessionKeepaliveConfig>,
1107 resumable: bool,
1108 recoverer: Option<Box<dyn ConduitRecoverer>>,
1109 recovery_timeout: Option<Duration>,
1110 observer: Option<VoxObserverHandle>,
1111 ) -> Self
1112 where
1113 Tx: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
1114 Rx: ConduitRx<Msg = MessageFamily> + MaybeSend + 'static,
1115 {
1116 let (outbound_tx, outbound_rx) = tokio_mpsc::channel(256);
1117 let sess_core = Arc::new(SessionCore {
1118 inner: std::sync::Mutex::new(SessionCoreInner {
1119 tx: Arc::new(tx) as Arc<dyn DynConduitTx>,
1120 conns: HashMap::new(),
1121 }),
1122 outbound_tx,
1123 observer: observer.clone(),
1124 });
1125 spawn_outbound_worker(outbound_rx);
1126 let (resume_notifier, _resume_rx) = watch::channel(0_u64);
1127 Session {
1128 rx: Box::new(rx),
1129 role: SessionRole::Initiator, parity: Parity::Odd, sess_core,
1132 peer_supports_retry: false,
1133 local_root_settings: ConnectionSettings {
1134 parity: Parity::Odd,
1135 max_concurrent_requests: 64,
1136 initial_channel_credit: 16,
1137 },
1138 peer_root_settings: None,
1139 resumable,
1140 session_resume_key: None,
1141 conns: BTreeMap::new(),
1142 root_closed_internal: false,
1143 conn_ids: IdAllocator::new(Parity::Odd), on_connection,
1145 open_rx,
1146 close_rx,
1147 resume_rx,
1148 control_tx,
1149 control_rx,
1150 keepalive,
1151 resume_notifier,
1152 recoverer,
1153 recovery_timeout,
1154 registered_in_registry: false,
1155 observer,
1156 }
1157 }
1158
1159 pub(crate) fn resume_key(&self) -> Option<SessionResumeKey> {
1160 self.session_resume_key
1161 }
1162
1163 fn establish_from_handshake(
1165 &mut self,
1166 result: HandshakeResult,
1167 ) -> Result<ConnectionHandle, SessionError> {
1168 self.role = result.role;
1169 self.parity = result.our_settings.parity;
1170 self.conn_ids = IdAllocator::new(result.our_settings.parity);
1171 self.local_root_settings = result.our_settings.clone();
1172 self.peer_root_settings = Some(result.peer_settings.clone());
1173 self.peer_supports_retry = result.peer_supports_retry;
1174 self.session_resume_key = result.session_resume_key;
1175
1176 if self.resumable && self.session_resume_key.is_none() {
1177 return Err(SessionError::NotResumable);
1178 }
1179
1180 Ok(self.make_root_handle(result.our_settings, result.peer_settings))
1181 }
1182
1183 fn make_root_handle(
1184 &mut self,
1185 local_settings: ConnectionSettings,
1186 peer_settings: ConnectionSettings,
1187 ) -> ConnectionHandle {
1188 self.make_connection_handle(ConnectionId::ROOT, local_settings, peer_settings)
1189 }
1190
1191 fn make_connection_handle(
1192 &mut self,
1193 conn_id: ConnectionId,
1194 local_settings: ConnectionSettings,
1195 peer_settings: ConnectionSettings,
1196 ) -> ConnectionHandle {
1197 let label = format!("session.conn{}", conn_id.0);
1198 let (conn_tx, conn_rx) = mpsc::channel::<RecvMessage>(&label, 64);
1199 let (failures_tx, failures_rx) = mpsc::unbounded_channel(format!("{label}.failures"));
1200 let (closed_tx, closed_rx) = watch::channel(None);
1201 let resumed_rx = self.resume_notifier.subscribe();
1202
1203 let sender = ConnectionSender {
1204 connection_id: conn_id,
1205 sess_core: Arc::clone(&self.sess_core),
1206 failures: Arc::new(failures_tx),
1207 };
1208
1209 let parity = local_settings.parity;
1210 let handle_local_settings = local_settings.clone();
1211 let handle_peer_settings = peer_settings.clone();
1212 trace!(%conn_id, "make_connection_handle: inserting slot into conns");
1213 if let Some(observer) = &self.observer {
1214 observer.driver_event(vox_types::DriverEvent::ConnectionOpened {
1215 connection_id: conn_id,
1216 });
1217 }
1218 self.conns.insert(
1219 conn_id,
1220 ConnectionSlot::Active(ConnectionState {
1221 id: conn_id,
1222 local_settings,
1223 peer_settings,
1224 conn_tx,
1225 closed_tx,
1226 schema_recv_tracker: Arc::new(vox_types::SchemaRecvTracker::new()),
1227 }),
1228 );
1229
1230 ConnectionHandle {
1231 sender,
1232 rx: conn_rx,
1233 failures_rx,
1234 control_tx: Some(self.control_tx.clone()),
1235 closed_rx,
1236 resumed_rx,
1237 local_settings: handle_local_settings,
1238 peer_settings: handle_peer_settings,
1239 parity,
1240 peer_supports_retry: self.peer_supports_retry,
1241 observer: self.observer.clone(),
1242 }
1243 }
1244
1245 pub async fn run(&mut self) {
1250 let mut keepalive_runtime = self.make_keepalive_runtime();
1251 let mut keepalive_tick = keepalive_runtime.as_ref().map(|_| {
1252 let mut interval = vox_types::time::tokio::interval(Duration::from_millis(10));
1253 interval.set_missed_tick_behavior(vox_types::time::tokio::MissedTickBehavior::Delay);
1254 interval
1255 });
1256
1257 loop {
1258 tokio::select! {
1259 biased;
1265
1266 msg = self.rx.recv_msg() => {
1267 vox_types::dlog!("[session {:?}] recv_msg returned", self.role);
1268 match msg {
1269 Ok(Some(msg)) => {
1270 self.handle_message(msg, &mut keepalive_runtime).await;
1271 }
1272 Ok(None) => {
1273 vox_types::dlog!("[session {:?}] recv loop: conduit returned EOF", self.role);
1274 if !self.handle_conduit_break(&mut keepalive_runtime).await {
1275 vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1276 self.close_all_connections(ConnectionCloseReason::Remote);
1277 break;
1278 }
1279 }
1280 Err(error) => {
1281 let close_reason = classify_session_recv_error(&error);
1282 self.observe_session_recv_error(&error);
1283 warn!(
1284 role = ?self.role,
1285 %error,
1286 ?close_reason,
1287 "session receive failed; closing connections if recovery is unavailable"
1288 );
1289 vox_types::dlog!("[session {:?}] recv loop: conduit recv error: {}", self.role, error);
1290 if !self.handle_conduit_break(&mut keepalive_runtime).await {
1291 vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1292 self.close_all_connections(close_reason);
1293 break;
1294 }
1295 }
1296 }
1297 }
1298 Some(req) = self.open_rx.recv() => {
1299 self.handle_open_request(req).await;
1300 }
1301 Some(req) = self.close_rx.recv() => {
1302 self.handle_close_request(req).await;
1303 }
1304 Some(req) = self.resume_rx.recv() => {
1305 let _ = req.result_tx.send(Err(SessionError::Protocol(
1306 "resume is only valid while the session is disconnected".into(),
1307 )));
1308 }
1309 Some(req) = self.control_rx.recv() => {
1310 if !self.handle_drop_control_request(req).await {
1311 self.close_all_connections(ConnectionCloseReason::Local);
1312 break;
1313 }
1314 }
1315 _ = async {
1316 if let Some(interval) = keepalive_tick.as_mut() {
1317 interval.tick().await;
1318 }
1319 }, if keepalive_tick.is_some() => {
1320 if !self.handle_keepalive_tick(&mut keepalive_runtime).await {
1321 self.close_all_connections(ConnectionCloseReason::Protocol);
1322 break;
1323 }
1324 }
1325 }
1326 }
1327
1328 self.close_all_connections(ConnectionCloseReason::SessionShutdown);
1330 trace!("session recv loop exited");
1331 }
1332
1333 async fn handle_conduit_break(
1334 &mut self,
1335 keepalive_runtime: &mut Option<KeepaliveRuntime>,
1336 ) -> bool {
1337 if let Some(recoverer) = self.recoverer.as_mut() {
1344 let recovery_fut = recoverer.next_conduit(self.session_resume_key.as_ref());
1345 let recovery_result = match self.recovery_timeout {
1346 Some(timeout) => match vox_types::time::tokio::timeout(timeout, recovery_fut).await
1347 {
1348 Ok(r) => r,
1349 Err(_) => return false,
1350 },
1351 None => recovery_fut.await,
1352 };
1353 match recovery_result {
1354 Ok(recovered) => {
1355 let result =
1356 self.resume_from_handshake(recovered.tx, recovered.rx, recovered.handshake);
1357 match result {
1358 Ok(()) => {
1359 let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1360 let _ = self.resume_notifier.send(next_generation);
1361 *keepalive_runtime = self.make_keepalive_runtime();
1362 return true;
1363 }
1364 Err(_) => return false,
1365 }
1366 }
1367 Err(_) => return false,
1368 }
1369 }
1370
1371 if !self.registered_in_registry {
1372 return false;
1373 }
1374
1375 loop {
1376 tokio::select! {
1377 Some(req) = self.resume_rx.recv() => {
1378 let result =
1379 self.resume_from_handshake(req.tx, req.rx, req.handshake_result);
1380 let ok = result.is_ok();
1381 let _ = req.result_tx.send(result);
1382 if ok {
1383 let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1384 let _ = self.resume_notifier.send(next_generation);
1385 *keepalive_runtime = self.make_keepalive_runtime();
1386 return true;
1387 }
1388 }
1389 Some(req) = self.control_rx.recv() => {
1390 if !self.handle_drop_control_request(req).await {
1391 return false;
1392 }
1393 }
1394 Some(req) = self.open_rx.recv() => {
1395 let _ = req.result_tx.send(Err(SessionError::Protocol(
1396 "session is disconnected; resume before opening connections".into(),
1397 )));
1398 }
1399 Some(req) = self.close_rx.recv() => {
1400 let _ = req.result_tx.send(Err(SessionError::Protocol(
1401 "session is disconnected; resume before closing connections".into(),
1402 )));
1403 }
1404 else => return false,
1405 }
1406 }
1407 }
1408
1409 fn resume_from_handshake(
1411 &mut self,
1412 tx: Arc<dyn DynConduitTx>,
1413 rx: Box<dyn DynConduitRx>,
1414 result: HandshakeResult,
1415 ) -> Result<(), SessionError> {
1416 let Some(peer_settings) = self.peer_root_settings.clone() else {
1417 return Err(SessionError::Protocol("missing peer root settings".into()));
1418 };
1419
1420 if result.our_settings != self.local_root_settings {
1421 return Err(SessionError::Protocol(
1422 "local root settings changed across session resume".into(),
1423 ));
1424 }
1425
1426 if result.peer_settings != peer_settings {
1427 return Err(SessionError::Protocol(
1428 "peer root settings changed across session resume".into(),
1429 ));
1430 }
1431
1432 self.peer_supports_retry = result.peer_supports_retry;
1433 self.session_resume_key = result.session_resume_key.or(self.session_resume_key);
1434
1435 self.sess_core.replace_tx_and_reset_schemas(tx);
1436 self.rx = rx;
1437 if let Some(ConnectionSlot::Active(state)) = self.conns.get_mut(&ConnectionId::ROOT) {
1440 state.schema_recv_tracker = Arc::new(vox_types::SchemaRecvTracker::new());
1441 }
1442 Ok(())
1443 }
1444
1445 async fn handle_message(
1446 &mut self,
1447 msg: SelfRef<Message<'static>>,
1448 keepalive_runtime: &mut Option<KeepaliveRuntime>,
1449 ) {
1450 let msg_ref = msg.get();
1451 let conn_id = msg_ref.connection_id;
1452 match &msg_ref.payload {
1453 MessagePayload::Ping(ping) => {
1454 let _ = self
1455 .sess_core
1456 .send(
1457 Message {
1458 connection_id: conn_id,
1459 payload: MessagePayload::Pong(vox_types::Pong { nonce: ping.nonce }),
1460 },
1461 None,
1462 None,
1463 )
1464 .await;
1465 return;
1466 }
1467 MessagePayload::Pong(pong) => {
1468 if conn_id.is_root() {
1469 self.handle_keepalive_pong(pong.nonce, keepalive_runtime);
1470 }
1471 return;
1472 }
1473 MessagePayload::SchemaMessage(schema_msg) => {
1474 let schema_recv_tracker = match self.conns.get(&conn_id) {
1475 Some(ConnectionSlot::Active(state)) => Arc::clone(&state.schema_recv_tracker),
1476 _ => return,
1477 };
1478 let _ = self.record_received_schema_cbor(
1479 conn_id,
1480 schema_recv_tracker,
1481 schema_msg.method_id,
1482 schema_msg.direction,
1483 &schema_msg.schemas,
1484 "standalone schema message",
1485 );
1486 return;
1487 }
1488 _ => {}
1489 }
1490 vox_types::selfref_match!(msg, payload {
1491 MessagePayload::ConnectionClose(_) => {
1493 if conn_id.is_root() {
1494 warn!("received ConnectionClose for root connection");
1495 } else {
1496 trace!(conn_id = conn_id.0, "received ConnectionClose for virtual connection");
1497 }
1498 self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Remote);
1502 self.maybe_request_shutdown_after_root_closed();
1503 }
1504 MessagePayload::ConnectionOpen(open) => {
1505 self.handle_inbound_open(conn_id, open).await;
1506 }
1507 MessagePayload::ConnectionAccept(accept) => {
1508 self.handle_inbound_accept(conn_id, accept);
1509 }
1510 MessagePayload::ConnectionReject(reject) => {
1511 self.handle_inbound_reject(conn_id, reject);
1512 }
1513 MessagePayload::RequestMessage(r) => {
1514 let r_ref = r.get();
1515 vox_types::dlog!(
1516 "[session {:?}] recv request: conn={:?} req={:?} body={} method={:?}",
1517 self.role,
1518 conn_id,
1519 r_ref.id,
1520 match &r_ref.body {
1521 RequestBody::Call(_) => "Call",
1522 RequestBody::Response(_) => "Response",
1523 RequestBody::Cancel(_) => "Cancel",
1524 },
1525 match &r_ref.body {
1526 RequestBody::Call(call) => Some(call.method_id),
1527 RequestBody::Response(_) | RequestBody::Cancel(_) => None,
1528 }
1529 );
1530 let response_had_schema_payload = matches!(&r_ref.body, RequestBody::Response(resp) if !resp.schemas.is_empty());
1532 {
1533 let schemas_cbor = match &r_ref.body {
1534 RequestBody::Call(call) => Some(&call.schemas),
1535 RequestBody::Response(resp) => Some(&resp.schemas),
1536 _ => None,
1537 };
1538 vox_types::dlog!(
1539 "[schema] recv ({:?}): req={:?} body={} schemas_len={:?}",
1540 self.role,
1541 r_ref.id,
1542 match &r_ref.body {
1543 RequestBody::Call(_) => "Call",
1544 RequestBody::Response(_) => "Response",
1545 RequestBody::Cancel(_) => "Cancel",
1546 },
1547 schemas_cbor.map(|s| s.0.len())
1548 );
1549 let schema_recv_tracker = match self.conns.get(&conn_id) {
1550 Some(ConnectionSlot::Active(state)) => {
1551 Arc::clone(&state.schema_recv_tracker)
1552 }
1553 _ => return,
1554 };
1555 if let Some(schemas_cbor) = schemas_cbor
1556 && !schemas_cbor.is_empty()
1557 {
1558 let (method_id, direction) = match &r_ref.body {
1559 RequestBody::Call(call) => {
1560 (call.method_id, vox_types::BindingDirection::Args)
1561 }
1562 RequestBody::Response(_) => {
1563 let Some(method_id) =
1564 self.sess_core.take_outgoing_call_method(conn_id, r_ref.id)
1565 else {
1566 self.close_connection_for_protocol_error(
1567 conn_id,
1568 format!(
1569 "response schemas for unknown inflight request {:?}",
1570 r_ref.id
1571 ),
1572 );
1573 return;
1574 };
1575 (method_id, vox_types::BindingDirection::Response)
1576 }
1577 RequestBody::Cancel(_) => unreachable!(),
1578 };
1579 if !self.record_received_schema_cbor(
1580 conn_id,
1581 schema_recv_tracker,
1582 method_id,
1583 direction,
1584 schemas_cbor,
1585 "inlined request schemas",
1586 ) {
1587 return;
1588 }
1589 }
1590 }
1591 if matches!(&r_ref.body, RequestBody::Response(_)) && !response_had_schema_payload {
1592 let _ = self.sess_core.take_outgoing_call_method(conn_id, r_ref.id);
1593 }
1594 if let RequestBody::Call(call) = &r_ref.body {
1597 self.sess_core.record_incoming_call(conn_id, r_ref.id, call.method_id);
1598 }
1599 let state = match self.conns.get(&conn_id) {
1600 Some(ConnectionSlot::Active(state)) => state,
1601 _ => return,
1602 };
1603 let conn_tx = state.conn_tx.clone();
1604 let request_id = r_ref.id;
1605 let body_kind = match &r_ref.body {
1606 RequestBody::Call(_) => "Call",
1607 RequestBody::Response(_) => "Response",
1608 RequestBody::Cancel(_) => "Cancel",
1609 };
1610 let recv_msg = RecvMessage {
1611 schemas: Arc::clone(&state.schema_recv_tracker),
1612 msg: r.map(ConnectionMessage::Request),
1613 };
1614 vox_types::dlog!(
1615 "[session {:?}] dispatch request: conn={:?} req={:?} body={}",
1616 self.role,
1617 conn_id,
1618 request_id,
1619 body_kind
1620 );
1621 if conn_tx.send(recv_msg).await.is_err() {
1622 self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1623 self.maybe_request_shutdown_after_root_closed();
1624 }
1625 }
1626 MessagePayload::ChannelMessage(c) => {
1627 let state = match self.conns.get(&conn_id) {
1628 Some(ConnectionSlot::Active(state)) => state,
1629 _ => return,
1630 };
1631 let conn_tx = state.conn_tx.clone();
1632 let recv_msg = RecvMessage {
1633 schemas: Arc::clone(&state.schema_recv_tracker),
1634 msg: c.map(ConnectionMessage::Channel),
1635 };
1636 if conn_tx.send(recv_msg).await.is_err() {
1637 self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1638 self.maybe_request_shutdown_after_root_closed();
1639 }
1640 }
1641 })
1643 }
1644
1645 fn make_keepalive_runtime(&self) -> Option<KeepaliveRuntime> {
1646 let config = self.keepalive?;
1647 if config.ping_interval.is_zero() || config.pong_timeout.is_zero() {
1648 warn!("keepalive disabled due to non-positive interval/timeout");
1649 return None;
1650 }
1651 let now = vox_types::time::tokio::Instant::now();
1652 Some(KeepaliveRuntime {
1653 ping_interval: config.ping_interval,
1654 pong_timeout: config.pong_timeout,
1655 next_ping_at: now + config.ping_interval,
1656 waiting_pong_nonce: None,
1657 pong_deadline: now,
1658 next_ping_nonce: 1,
1659 })
1660 }
1661
1662 fn handle_keepalive_pong(&self, nonce: u64, keepalive_runtime: &mut Option<KeepaliveRuntime>) {
1663 let Some(runtime) = keepalive_runtime.as_mut() else {
1664 return;
1665 };
1666 if runtime.waiting_pong_nonce != Some(nonce) {
1667 return;
1668 }
1669 runtime.waiting_pong_nonce = None;
1670 runtime.next_ping_at = vox_types::time::tokio::Instant::now() + runtime.ping_interval;
1671 }
1672
1673 async fn handle_keepalive_tick(
1674 &mut self,
1675 keepalive_runtime: &mut Option<KeepaliveRuntime>,
1676 ) -> bool {
1677 let Some(runtime) = keepalive_runtime.as_mut() else {
1678 return true;
1679 };
1680 let now = vox_types::time::tokio::Instant::now();
1681
1682 if let Some(waiting_nonce) = runtime.waiting_pong_nonce {
1683 if now >= runtime.pong_deadline {
1684 warn!(
1685 nonce = waiting_nonce,
1686 timeout_ms = runtime.pong_timeout.as_millis(),
1687 "keepalive timeout waiting for pong"
1688 );
1689 return false;
1690 }
1691 return true;
1692 }
1693
1694 if now < runtime.next_ping_at {
1695 return true;
1696 }
1697
1698 let nonce = runtime.next_ping_nonce;
1699 if self
1700 .sess_core
1701 .send(
1702 Message {
1703 connection_id: ConnectionId::ROOT,
1704 payload: MessagePayload::Ping(vox_types::Ping { nonce }),
1705 },
1706 None,
1707 None,
1708 )
1709 .await
1710 .is_err()
1711 {
1712 warn!("failed to send keepalive ping");
1713 return false;
1714 }
1715
1716 runtime.waiting_pong_nonce = Some(nonce);
1717 runtime.pong_deadline = now + runtime.pong_timeout;
1718 runtime.next_ping_at = now + runtime.ping_interval;
1719 runtime.next_ping_nonce = runtime.next_ping_nonce.wrapping_add(1);
1720 true
1721 }
1722
1723 async fn handle_inbound_open(
1724 &mut self,
1725 conn_id: ConnectionId,
1726 open: SelfRef<ConnectionOpen<'static>>,
1727 ) {
1728 let peer_parity = self.parity.other();
1730 if !conn_id.has_parity(peer_parity) {
1731 let _ = self
1733 .sess_core
1734 .send(
1735 Message {
1736 connection_id: conn_id,
1737 payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1738 metadata: vec![],
1739 }),
1740 },
1741 None,
1742 None,
1743 )
1744 .await;
1745 return;
1746 }
1747
1748 if self.conns.contains_key(&conn_id) {
1750 let _ = self
1752 .sess_core
1753 .send(
1754 Message {
1755 connection_id: conn_id,
1756 payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1757 metadata: vec![],
1758 }),
1759 },
1760 None,
1761 None,
1762 )
1763 .await;
1764 return;
1765 }
1766
1767 if self.on_connection.is_none() {
1770 let _ = self
1771 .sess_core
1772 .send(
1773 Message {
1774 connection_id: conn_id,
1775 payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1776 metadata: vec![],
1777 }),
1778 },
1779 None,
1780 None,
1781 )
1782 .await;
1783 return;
1784 }
1785
1786 let open = open.get();
1788 if open.connection_settings.initial_channel_credit == 0 {
1789 let _ = self
1790 .sess_core
1791 .send(
1792 Message {
1793 connection_id: conn_id,
1794 payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1795 metadata: vec![vox_types::MetadataEntry::str(
1796 "error",
1797 "initial_channel_credit must be greater than zero",
1798 )],
1799 }),
1800 },
1801 None,
1802 None,
1803 )
1804 .await;
1805 return;
1806 }
1807
1808 let our_settings = ConnectionSettings {
1809 parity: open.connection_settings.parity.other(),
1810 max_concurrent_requests: open.connection_settings.max_concurrent_requests,
1811 initial_channel_credit: open.connection_settings.initial_channel_credit,
1812 };
1813
1814 let handle = self.make_connection_handle(
1816 conn_id,
1817 our_settings.clone(),
1818 open.connection_settings.clone(),
1819 );
1820
1821 let mut metadata: Vec<vox_types::MetadataEntry<'_>> = open.metadata.to_vec();
1823 metadata.push(vox_types::MetadataEntry::str(
1824 "vox-connection-kind",
1825 "virtual",
1826 ));
1827 let request = match ConnectionRequest::new(&metadata) {
1828 Ok(r) => r,
1829 Err(e) => {
1830 trace!(%conn_id, %e, "rejecting virtual connection");
1831 self.conns.remove(&conn_id);
1832 let _ = self
1833 .sess_core
1834 .send(
1835 Message {
1836 connection_id: conn_id,
1837 payload: MessagePayload::ConnectionReject(
1838 vox_types::ConnectionReject {
1839 metadata: vec![vox_types::MetadataEntry::str(
1840 "error",
1841 e.to_string(),
1842 )],
1843 },
1844 ),
1845 },
1846 None,
1847 None,
1848 )
1849 .await;
1850 return;
1851 }
1852 };
1853 let pending = PendingConnection::new(handle);
1854 let acceptor = self.on_connection.as_ref().unwrap();
1855 trace!(%conn_id, "calling acceptor for virtual connection");
1856 match acceptor.accept(&request, pending) {
1857 Ok(()) => {
1858 trace!(%conn_id, "acceptor accepted virtual connection, sending ConnectionAccept");
1859 let _ = self
1860 .sess_core
1861 .send(
1862 Message {
1863 connection_id: conn_id,
1864 payload: MessagePayload::ConnectionAccept(
1865 vox_types::ConnectionAccept {
1866 connection_settings: our_settings,
1867 metadata: vec![],
1868 },
1869 ),
1870 },
1871 None,
1872 None,
1873 )
1874 .await;
1875 }
1876 Err(reject_metadata) => {
1877 trace!(%conn_id, "acceptor rejected, removing conn slot");
1879 self.conns.remove(&conn_id);
1880 let _ = self
1881 .sess_core
1882 .send(
1883 Message {
1884 connection_id: conn_id,
1885 payload: MessagePayload::ConnectionReject(
1886 vox_types::ConnectionReject {
1887 metadata: reject_metadata,
1888 },
1889 ),
1890 },
1891 None,
1892 None,
1893 )
1894 .await;
1895 }
1896 }
1897 }
1898
1899 fn handle_inbound_accept(
1900 &mut self,
1901 conn_id: ConnectionId,
1902 accept: SelfRef<ConnectionAccept<'static>>,
1903 ) {
1904 let accept = accept.get();
1905 let slot = self.remove_connection(&conn_id);
1906 match slot {
1907 Some(ConnectionSlot::PendingOutbound(mut pending))
1908 if accept.connection_settings.initial_channel_credit == 0 =>
1909 {
1910 if let Some(tx) = pending.result_tx.take() {
1911 let _ = tx.send(Err(SessionError::Protocol(
1912 "initial_channel_credit must be greater than zero".into(),
1913 )));
1914 }
1915 }
1916 Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1917 let handle = self.make_connection_handle(
1918 conn_id,
1919 pending.local_settings.clone(),
1920 accept.connection_settings.clone(),
1921 );
1922
1923 if let Some(tx) = pending.result_tx.take() {
1924 let _ = tx.send(Ok(handle));
1925 }
1926 }
1927 Some(other) => {
1928 self.conns.insert(conn_id, other);
1930 }
1931 None => {
1932 }
1934 }
1935 }
1936
1937 fn handle_inbound_reject(
1938 &mut self,
1939 conn_id: ConnectionId,
1940 reject: SelfRef<ConnectionReject<'static>>,
1941 ) {
1942 let reject = reject.get();
1943 let slot = self.remove_connection(&conn_id);
1944 match slot {
1945 Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1946 if let Some(tx) = pending.result_tx.take() {
1947 let _ = tx.send(Err(SessionError::Rejected(vox_types::metadata_into_owned(
1948 reject.metadata.to_vec(),
1949 ))));
1950 }
1951 }
1952 Some(other) => {
1953 self.conns.insert(conn_id, other);
1954 }
1955 None => {}
1956 }
1957 }
1958
1959 async fn handle_open_request(&mut self, req: OpenRequest) {
1961 if req.settings.initial_channel_credit == 0 {
1962 let _ = req.result_tx.send(Err(SessionError::Protocol(
1963 "initial_channel_credit must be greater than zero".into(),
1964 )));
1965 return;
1966 }
1967
1968 let conn_id = self.conn_ids.alloc();
1969
1970 let send_result = self
1972 .sess_core
1973 .send(
1974 Message {
1975 connection_id: conn_id,
1976 payload: MessagePayload::ConnectionOpen(ConnectionOpen {
1977 connection_settings: req.settings.clone(),
1978 metadata: req.metadata,
1979 }),
1980 },
1981 None,
1982 None,
1983 )
1984 .await;
1985
1986 if send_result.is_err() {
1987 let _ = req.result_tx.send(Err(SessionError::Protocol(
1988 "failed to send ConnectionOpen".into(),
1989 )));
1990 return;
1991 }
1992
1993 self.conns.insert(
1996 conn_id,
1997 ConnectionSlot::PendingOutbound(PendingOutboundData {
1998 local_settings: req.settings,
1999 result_tx: Some(req.result_tx),
2000 }),
2001 );
2002 }
2003
2004 async fn handle_close_request(&mut self, req: CloseRequest) {
2006 if req.conn_id.is_root() {
2007 let _ = req.result_tx.send(Err(SessionError::Protocol(
2008 "cannot close root connection".into(),
2009 )));
2010 return;
2011 }
2012
2013 if self
2016 .remove_connection_with_reason(&req.conn_id, ConnectionCloseReason::Local)
2017 .is_none()
2018 {
2019 let _ = req
2020 .result_tx
2021 .send(Err(SessionError::Protocol("connection not found".into())));
2022 return;
2023 }
2024
2025 let send_result = self
2027 .sess_core
2028 .send(
2029 Message {
2030 connection_id: req.conn_id,
2031 payload: MessagePayload::ConnectionClose(ConnectionClose {
2032 metadata: req.metadata,
2033 }),
2034 },
2035 None,
2036 None,
2037 )
2038 .await;
2039
2040 if send_result.is_err() {
2041 let _ = req.result_tx.send(Err(SessionError::Protocol(
2042 "failed to send ConnectionClose".into(),
2043 )));
2044 return;
2045 }
2046
2047 let _ = req.result_tx.send(Ok(()));
2048 self.maybe_request_shutdown_after_root_closed();
2049 }
2050
2051 async fn handle_drop_control_request(&mut self, req: DropControlRequest) -> bool {
2052 match req {
2053 DropControlRequest::Shutdown => {
2054 trace!("session shutdown requested");
2055 false
2056 }
2057 DropControlRequest::Close(conn_id) => {
2058 if conn_id.is_root() {
2060 trace!("root callers dropped; internally closing root connection");
2062 self.root_closed_internal = true;
2063 return self.has_virtual_connections();
2065 }
2066
2067 if self
2068 .remove_connection_with_reason(&conn_id, ConnectionCloseReason::Local)
2069 .is_some()
2070 {
2071 let _ = self
2072 .sess_core
2073 .send(
2074 Message {
2075 connection_id: conn_id,
2076 payload: MessagePayload::ConnectionClose(ConnectionClose {
2077 metadata: vec![],
2078 }),
2079 },
2080 None,
2081 None,
2082 )
2083 .await;
2084 }
2085
2086 !self.root_closed_internal || self.has_virtual_connections()
2087 }
2088 }
2089 }
2090
2091 fn has_virtual_connections(&self) -> bool {
2092 self.conns.keys().any(|id| !id.is_root())
2093 }
2094
2095 fn remove_connection(&mut self, conn_id: &ConnectionId) -> Option<ConnectionSlot> {
2096 self.remove_connection_with_reason(conn_id, ConnectionCloseReason::Unknown)
2097 }
2098
2099 fn remove_connection_with_reason(
2100 &mut self,
2101 conn_id: &ConnectionId,
2102 reason: ConnectionCloseReason,
2103 ) -> Option<ConnectionSlot> {
2104 trace!(%conn_id, "remove_connection called");
2105 let slot = self.conns.remove(conn_id);
2106 if let Some(ConnectionSlot::Active(state)) = &slot {
2107 let _ = state.closed_tx.send(Some(reason));
2108 if let Some(observer) = &self.observer {
2109 observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2110 connection_id: *conn_id,
2111 reason,
2112 });
2113 }
2114 }
2115 slot
2116 }
2117
2118 fn close_all_connections(&mut self, reason: ConnectionCloseReason) {
2120 trace!(role = ?self.role, count = self.conns.len(), "close_all_connections");
2121 vox_types::dlog!(
2122 "[session {:?}] close_all_connections: {} slots",
2123 self.role,
2124 self.conns.len()
2125 );
2126 for (conn_id, slot) in self.conns.iter() {
2127 if let ConnectionSlot::Active(state) = slot {
2128 vox_types::dlog!("[session {:?}] closing connection {:?}", self.role, conn_id);
2129 let _ = state.closed_tx.send(Some(reason));
2130 if let Some(observer) = &self.observer {
2131 observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2132 connection_id: *conn_id,
2133 reason,
2134 });
2135 }
2136 }
2137 }
2138 self.conns.clear();
2139 }
2140
2141 fn maybe_request_shutdown_after_root_closed(&self) {
2142 if self.root_closed_internal && !self.has_virtual_connections() {
2143 let _ = send_drop_control(&self.control_tx, DropControlRequest::Shutdown);
2144 }
2145 }
2146}
2147
2148pub(crate) struct SessionCore {
2149 inner: std::sync::Mutex<SessionCoreInner>,
2150 outbound_tx: tokio_mpsc::Sender<OutboundBatch>,
2151 observer: Option<VoxObserverHandle>,
2152}
2153
2154pub trait OutboundSendFuture: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2155impl<T> OutboundSendFuture for T where T: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2156
2157type OutboundSend = Pin<Box<dyn OutboundSendFuture>>;
2158
2159#[derive(Clone)]
2160struct PendingSchemaSend {
2161 method_id: vox_types::MethodId,
2162 direction: vox_types::BindingDirection,
2163 prepared: vox_types::PreparedSchemaPlan,
2164}
2165
2166struct OutboundBatch {
2167 conn_id: ConnectionId,
2168 conn_state: Arc<std::sync::Mutex<SendConnState>>,
2169 tx: Arc<dyn DynConduitTx>,
2170 schema_sends: Vec<PendingSchemaSend>,
2171 payload_send: OutboundSend,
2172 result_tx: tokio_oneshot::Sender<std::io::Result<()>>,
2173}
2174
2175async fn run_outbound_worker(mut rx: tokio_mpsc::Receiver<OutboundBatch>) {
2176 while let Some(batch) = rx.recv().await {
2177 let mut result = Ok(());
2178 for schema_send in batch.schema_sends {
2179 let schemas = {
2180 let mut conn_state = batch
2181 .conn_state
2182 .lock()
2183 .expect("send conn state mutex poisoned");
2184 conn_state.send_tracker.preview_prepared_plan(
2185 schema_send.method_id,
2186 schema_send.direction,
2187 &schema_send.prepared,
2188 )
2189 };
2190 if schemas.is_empty() {
2191 continue;
2192 }
2193
2194 let schema_msg = Message {
2195 connection_id: batch.conn_id,
2196 payload: MessagePayload::SchemaMessage(SchemaMessage {
2197 method_id: schema_send.method_id,
2198 direction: schema_send.direction,
2199 schemas,
2200 }),
2201 };
2202 let send = match batch.tx.clone().prepare_msg(schema_msg, None) {
2203 Ok(send) => send,
2204 Err(error) => {
2205 result = Err(error);
2206 break;
2207 }
2208 };
2209 if let Err(error) = send.await {
2210 result = Err(error);
2211 break;
2212 }
2213 let mut conn_state = batch
2214 .conn_state
2215 .lock()
2216 .expect("send conn state mutex poisoned");
2217 conn_state.send_tracker.mark_prepared_plan_sent(
2218 schema_send.method_id,
2219 schema_send.direction,
2220 &schema_send.prepared,
2221 );
2222 conn_state
2223 .planned_bindings
2224 .remove(&(schema_send.direction, schema_send.method_id));
2225 }
2226 if result.is_ok()
2227 && let Err(error) = batch.payload_send.await
2228 {
2229 result = Err(error);
2230 }
2231 let _ = batch.result_tx.send(result);
2232 }
2233}
2234
2235#[cfg(not(target_arch = "wasm32"))]
2236fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2237 if tokio::runtime::Handle::try_current().is_ok() {
2238 tokio::spawn(run_outbound_worker(rx));
2239 return;
2240 }
2241
2242 std::thread::spawn(move || {
2243 let runtime = tokio::runtime::Builder::new_current_thread()
2244 .enable_all()
2245 .build()
2246 .expect("build outbound worker runtime");
2247 runtime.block_on(run_outbound_worker(rx));
2248 });
2249}
2250
2251#[cfg(target_arch = "wasm32")]
2252fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2253 wasm_bindgen_futures::spawn_local(run_outbound_worker(rx));
2254}
2255
2256struct SendConnState {
2257 send_tracker: vox_types::SchemaSendTracker,
2259
2260 inflight_incoming: HashMap<RequestId, vox_types::MethodId>,
2263
2264 inflight_outgoing: HashMap<RequestId, vox_types::MethodId>,
2267
2268 planned_bindings:
2270 HashMap<(vox_types::BindingDirection, vox_types::MethodId), vox_types::PreparedSchemaPlan>,
2271}
2272
2273impl SendConnState {
2274 fn new() -> Self {
2275 SendConnState {
2276 send_tracker: vox_types::SchemaSendTracker::new(),
2277 inflight_incoming: HashMap::new(),
2278 inflight_outgoing: HashMap::new(),
2279 planned_bindings: HashMap::new(),
2280 }
2281 }
2282}
2283
2284struct SessionCoreInner {
2285 tx: Arc<dyn DynConduitTx>,
2287
2288 conns: HashMap<ConnectionId, Arc<std::sync::Mutex<SendConnState>>>,
2290}
2291
2292fn get_or_create_send_conn_state(
2293 inner: &mut SessionCoreInner,
2294 conn_id: ConnectionId,
2295) -> Arc<std::sync::Mutex<SendConnState>> {
2296 inner
2297 .conns
2298 .entry(conn_id)
2299 .or_insert_with(|| Arc::new(std::sync::Mutex::new(SendConnState::new())))
2300 .clone()
2301}
2302
2303impl SessionCore {
2304 pub(crate) fn outbound_queue_stats(&self) -> (usize, usize) {
2305 let capacity = self.outbound_tx.max_capacity();
2306 let available = self.outbound_tx.capacity();
2307 (capacity.saturating_sub(available), capacity)
2308 }
2309
2310 fn prepare_outbound_batch<'a>(
2311 &self,
2312 mut msg: Message<'a>,
2313 binder: Option<&'a dyn vox_types::ChannelBinder>,
2314 forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2315 ) -> Result<(OutboundBatch, tokio_oneshot::Receiver<std::io::Result<()>>), ()> {
2316 let conn_id = msg.connection_id;
2317 let (tx, conn_state, schema_sends) = {
2318 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2319 let tx = inner.tx.clone();
2320 let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2321 drop(inner);
2322
2323 if let MessagePayload::RequestMessage(req) = &mut msg.payload {
2324 vox_types::dlog!(
2325 "[session-core] send request: conn={:?} req={:?} body={} forwarded={}",
2326 conn_id,
2327 req.id,
2328 match &req.body {
2329 RequestBody::Call(_) => "Call",
2330 RequestBody::Response(_) => "Response",
2331 RequestBody::Cancel(_) => "Cancel",
2332 },
2333 forwarded_schemas.is_some()
2334 );
2335 let schema_sends = {
2336 let mut conn_state_guard =
2337 conn_state.lock().expect("send conn state mutex poisoned");
2338 let mut schema_sends = Vec::new();
2339 match &mut req.body {
2340 RequestBody::Call(call) => {
2341 if let Some(schema_send) = Self::plan_call_schema_send(
2342 &mut conn_state_guard,
2343 req.id,
2344 call.method_id,
2345 call,
2346 forwarded_schemas,
2347 ) {
2348 schema_sends.push(schema_send);
2349 }
2350 call.schemas = Default::default();
2351 }
2352 RequestBody::Response(resp) => {
2353 if let Some(method_id) =
2354 conn_state_guard.inflight_incoming.remove(&req.id)
2355 && let Some(schema_send) = Self::plan_response_schema_send(
2356 &mut conn_state_guard,
2357 req.id,
2358 method_id,
2359 resp,
2360 forwarded_schemas,
2361 )
2362 {
2363 schema_sends.push(schema_send);
2364 }
2365 resp.schemas = Default::default();
2366 }
2367 RequestBody::Cancel(_) => {}
2368 }
2369 schema_sends
2370 };
2371 (tx, conn_state, schema_sends)
2372 } else {
2373 (tx, conn_state, Vec::new())
2374 }
2375 };
2376 let payload_send = tx.clone().prepare_msg(msg, binder).map_err(|_| ())?;
2377
2378 let (result_tx, result_rx) = tokio_oneshot::channel();
2379 Ok((
2380 OutboundBatch {
2381 conn_id,
2382 conn_state,
2383 tx,
2384 schema_sends,
2385 payload_send,
2386 result_tx,
2387 },
2388 result_rx,
2389 ))
2390 }
2391
2392 pub(crate) async fn send<'a>(
2394 &self,
2395 msg: Message<'a>,
2396 binder: Option<&'a dyn vox_types::ChannelBinder>,
2397 forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2398 ) -> Result<(), ()> {
2399 let connection_id = msg.connection_id;
2400 let (batch, result_rx) = self.prepare_outbound_batch(msg, binder, forwarded_schemas)?;
2401 if self.outbound_tx.send(batch).await.is_err() {
2402 if let Some(observer) = &self.observer {
2403 observer
2404 .driver_event(vox_types::DriverEvent::OutboundQueueClosed { connection_id });
2405 }
2406 return Err(());
2407 }
2408 let result = result_rx.await.map_err(|_| ());
2409 match result? {
2410 Ok(()) => Ok(()),
2411 Err(_) => {
2412 if let Some(observer) = &self.observer {
2413 observer.driver_event(vox_types::DriverEvent::EncodeError {
2414 connection_id,
2415 kind: vox_types::EncodeErrorKind::Transport,
2416 });
2417 }
2418 Err(())
2419 }
2420 }
2421 }
2422
2423 pub(crate) fn try_send<'a>(
2425 &self,
2426 msg: Message<'a>,
2427 binder: Option<&'a dyn vox_types::ChannelBinder>,
2428 forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2429 ) -> Result<(), TrySendError<()>> {
2430 let connection_id = msg.connection_id;
2431 let (batch, _result_rx) = self
2432 .prepare_outbound_batch(msg, binder, forwarded_schemas)
2433 .map_err(|_| TrySendError::Closed(()))?;
2434 self.outbound_tx.try_send(batch).map_err(|err| match err {
2435 tokio_mpsc::error::TrySendError::Full(_) => {
2436 if let Some(observer) = &self.observer {
2437 observer
2438 .driver_event(vox_types::DriverEvent::OutboundQueueFull { connection_id });
2439 }
2440 TrySendError::Full(())
2441 }
2442 tokio_mpsc::error::TrySendError::Closed(_) => {
2443 if let Some(observer) = &self.observer {
2444 observer.driver_event(vox_types::DriverEvent::OutboundQueueClosed {
2445 connection_id,
2446 });
2447 }
2448 TrySendError::Closed(())
2449 }
2450 })
2451 }
2452
2453 pub(crate) fn record_incoming_call(
2456 &self,
2457 conn_id: ConnectionId,
2458 request_id: RequestId,
2459 method_id: vox_types::MethodId,
2460 ) {
2461 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2462 let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2463 vox_types::dlog!(
2464 "[schema] record_incoming_call: conn={:?} req={:?} method={:?}",
2465 conn_id,
2466 request_id,
2467 method_id
2468 );
2469 conn_state
2470 .lock()
2471 .expect("send conn state mutex poisoned")
2472 .inflight_incoming
2473 .insert(request_id, method_id);
2474 }
2475
2476 pub(crate) fn take_outgoing_call_method(
2477 &self,
2478 conn_id: ConnectionId,
2479 request_id: RequestId,
2480 ) -> Option<vox_types::MethodId> {
2481 let inner = self.inner.lock().expect("session core mutex poisoned");
2482 inner.conns.get(&conn_id).and_then(|conn_state| {
2483 conn_state
2484 .lock()
2485 .expect("send conn state mutex poisoned")
2486 .inflight_outgoing
2487 .remove(&request_id)
2488 })
2489 }
2490
2491 pub(crate) fn prepare_response_for_method(
2492 &self,
2493 conn_id: ConnectionId,
2494 request_id: RequestId,
2495 method_id: vox_types::MethodId,
2496 response: &mut RequestResponse<'_>,
2497 ) {
2498 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2499 let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2500 let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2501 let key = (vox_types::BindingDirection::Response, method_id);
2502 if conn_state
2503 .send_tracker
2504 .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2505 {
2506 response.schemas = Default::default();
2507 return;
2508 }
2509
2510 let prepared = match &response.ret {
2511 vox_types::Payload::Value { shape, .. } => {
2512 match Self::get_or_plan_binding_for_shape(
2513 &mut conn_state,
2514 key,
2515 request_id,
2516 "response",
2517 shape,
2518 ) {
2519 Some(prepared) => prepared,
2520 None => return,
2521 }
2522 }
2523 vox_types::Payload::PostcardBytes(_) => {
2524 tracing::error!(
2525 "schema attachment failed: missing forwarded response schemas for method {:?}",
2526 method_id
2527 );
2528 return;
2529 }
2530 };
2531 response.schemas = prepared.to_cbor();
2532 }
2533
2534 pub(crate) fn prepare_response_from_source(
2536 &self,
2537 conn_id: ConnectionId,
2538 _request_id: RequestId,
2539 method_id: vox_types::MethodId,
2540 root_type: &vox_types::TypeRef,
2541 source: &dyn vox_types::SchemaSource,
2542 response: &mut RequestResponse<'_>,
2543 ) {
2544 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2545 let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2546 let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2547 let key = (vox_types::BindingDirection::Response, method_id);
2548 if conn_state
2549 .send_tracker
2550 .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2551 {
2552 response.schemas = Default::default();
2553 return;
2554 }
2555 let prepared =
2556 Self::get_or_plan_binding_from_source(&mut conn_state, key, root_type, source);
2557 response.schemas = prepared.to_cbor();
2558 }
2559
2560 pub(crate) fn prepare_response_from_shape(
2564 &self,
2565 conn_id: ConnectionId,
2566 request_id: RequestId,
2567 method_id: vox_types::MethodId,
2568 response_shape: &'static Shape,
2569 response: &mut RequestResponse<'_>,
2570 ) {
2571 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2572 let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2573 let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2574 let key = (vox_types::BindingDirection::Response, method_id);
2575 if conn_state
2576 .send_tracker
2577 .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2578 {
2579 response.schemas = Default::default();
2580 return;
2581 }
2582 let prepared = match Self::get_or_plan_binding_for_shape(
2583 &mut conn_state,
2584 key,
2585 request_id,
2586 "response",
2587 response_shape,
2588 ) {
2589 Some(prepared) => prepared,
2590 None => return,
2591 };
2592 response.schemas = prepared.to_cbor();
2593 }
2594
2595 fn get_or_plan_binding_for_shape(
2596 conn_state: &mut SendConnState,
2597 key: (vox_types::BindingDirection, vox_types::MethodId),
2598 request_id: RequestId,
2599 kind: &str,
2600 shape: &'static Shape,
2601 ) -> Option<vox_types::PreparedSchemaPlan> {
2602 if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2603 return Some(prepared.clone());
2604 }
2605 match vox_types::SchemaSendTracker::plan_for_shape(shape) {
2606 Ok(prepared) => {
2607 vox_types::dlog!(
2608 "[schema] planned {} {} schemas for method {:?} (req {:?})",
2609 prepared.schemas.len(),
2610 kind,
2611 key.1,
2612 request_id
2613 );
2614 conn_state.planned_bindings.insert(key, prepared.clone());
2615 Some(prepared)
2616 }
2617 Err(e) => {
2618 tracing::error!("schema extraction failed: {e}");
2619 None
2620 }
2621 }
2622 }
2623
2624 fn get_or_plan_binding_from_source(
2625 conn_state: &mut SendConnState,
2626 key: (vox_types::BindingDirection, vox_types::MethodId),
2627 root_type: &vox_types::TypeRef,
2628 source: &dyn vox_types::SchemaSource,
2629 ) -> vox_types::PreparedSchemaPlan {
2630 if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2631 return prepared.clone();
2632 }
2633 let prepared = vox_types::SchemaSendTracker::plan_from_source(root_type, source);
2634 conn_state.planned_bindings.insert(key, prepared.clone());
2635 prepared
2636 }
2637
2638 fn plan_response_schema_send(
2639 conn_state: &mut SendConnState,
2640 request_id: RequestId,
2641 method_id: vox_types::MethodId,
2642 response: &mut RequestResponse<'_>,
2643 forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2644 ) -> Option<PendingSchemaSend> {
2645 if conn_state
2646 .send_tracker
2647 .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2648 {
2649 response.schemas = Default::default();
2650 return None;
2651 }
2652
2653 let key = (vox_types::BindingDirection::Response, method_id);
2654 let prepared = if !response.schemas.is_empty() {
2655 conn_state
2656 .planned_bindings
2657 .get(&key)
2658 .cloned()
2659 .unwrap_or_else(|| {
2660 let prepared_payload = vox_types::SchemaPayload::from_cbor(&response.schemas.0)
2661 .expect("prepared schema payloads must be valid CBOR");
2662 vox_types::PreparedSchemaPlan {
2663 schemas: prepared_payload.schemas,
2664 root: prepared_payload.root,
2665 }
2666 })
2667 } else {
2668 match &response.ret {
2669 vox_types::Payload::Value { shape, .. } => Self::get_or_plan_binding_for_shape(
2670 conn_state, key, request_id, "response", shape,
2671 )?,
2672 vox_types::Payload::PostcardBytes(_) => {
2673 let Some(source) = forwarded_schemas else {
2674 tracing::error!(
2675 "schema attachment failed: missing forwarded response schemas for method {:?}",
2676 method_id
2677 );
2678 return None;
2679 };
2680 let Some(root) = source.get_remote_response_root(method_id) else {
2681 tracing::error!(
2682 "schema attachment failed: missing forwarded response root for method {:?}",
2683 method_id
2684 );
2685 return None;
2686 };
2687 Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2688 }
2689 }
2690 };
2691
2692 Some(PendingSchemaSend {
2693 method_id,
2694 direction: vox_types::BindingDirection::Response,
2695 prepared,
2696 })
2697 }
2698
2699 fn plan_call_schema_send(
2700 conn_state: &mut SendConnState,
2701 request_id: RequestId,
2702 method_id: vox_types::MethodId,
2703 call: &mut vox_types::RequestCall<'_>,
2704 forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2705 ) -> Option<PendingSchemaSend> {
2706 conn_state.inflight_outgoing.insert(request_id, method_id);
2707 if conn_state
2708 .send_tracker
2709 .has_sent_binding(method_id, vox_types::BindingDirection::Args)
2710 {
2711 call.schemas = Default::default();
2712 return None;
2713 }
2714
2715 let key = (vox_types::BindingDirection::Args, method_id);
2716 let prepared = match &call.args {
2717 vox_types::Payload::Value { shape, .. } => {
2718 Self::get_or_plan_binding_for_shape(conn_state, key, request_id, "args", shape)?
2719 }
2720 vox_types::Payload::PostcardBytes(_) => {
2721 let Some(source) = forwarded_schemas else {
2722 tracing::error!(
2723 "schema attachment failed: missing forwarded args schemas for method {:?}",
2724 method_id
2725 );
2726 return None;
2727 };
2728 let Some(root) = source.get_remote_args_root(method_id) else {
2729 tracing::error!(
2730 "schema attachment failed: missing forwarded args root for method {:?}",
2731 method_id
2732 );
2733 return None;
2734 };
2735 Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2736 }
2737 };
2738
2739 Some(PendingSchemaSend {
2740 method_id,
2741 direction: vox_types::BindingDirection::Args,
2742 prepared,
2743 })
2744 }
2745
2746 fn replace_tx_and_reset_schemas(&self, tx: Arc<dyn DynConduitTx>) {
2747 let mut inner = self.inner.lock().expect("session core mutex poisoned");
2748 inner.tx = tx;
2749 inner.conns.clear();
2750 }
2751}
2752
2753pub(crate) struct RecoveredConduit {
2754 pub tx: Arc<dyn DynConduitTx>,
2755 pub rx: Box<dyn DynConduitRx>,
2756 pub handshake: HandshakeResult,
2757}
2758
2759pub(crate) trait ConduitRecoverer: MaybeSend {
2760 fn next_conduit<'a>(
2761 &'a mut self,
2762 resume_key: Option<&'a SessionResumeKey>,
2763 ) -> BoxFut<'a, Result<RecoveredConduit, SessionError>>;
2764}
2765
2766pub trait DynConduitTx: MaybeSend + MaybeSync {
2767 fn prepare_msg<'a>(
2768 self: Arc<Self>,
2769 msg: Message<'a>,
2770 binder: Option<&'a dyn vox_types::ChannelBinder>,
2771 ) -> std::io::Result<OutboundSend>;
2772}
2773pub trait DynConduitRx: MaybeSend {
2774 fn recv_msg<'a>(&'a mut self)
2775 -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>>;
2776}
2777
2778impl<T> DynConduitTx for T
2781where
2782 T: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
2783{
2784 fn prepare_msg<'a>(
2785 self: Arc<Self>,
2786 msg: Message<'a>,
2787 binder: Option<&'a dyn vox_types::ChannelBinder>,
2788 ) -> std::io::Result<OutboundSend> {
2789 let prepared = if let Some(binder) = binder {
2790 vox_types::with_channel_binder(binder, || self.prepare_send(msg))
2791 } else {
2792 self.prepare_send(msg)
2793 };
2794 let prepared = prepared.map_err(|e| std::io::Error::other(e.to_string()))?;
2795 Ok(Box::pin(async move {
2796 self.send_prepared(prepared)
2797 .await
2798 .map_err(|e| std::io::Error::other(e.to_string()))
2799 }))
2800 }
2801}
2802
2803impl<T> DynConduitRx for T
2804where
2805 T: ConduitRx<Msg = MessageFamily> + MaybeSend,
2806{
2807 fn recv_msg<'a>(
2808 &'a mut self,
2809 ) -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>> {
2810 Box::pin(async move {
2811 self.recv()
2812 .await
2813 .map_err(|error| std::io::Error::other(error.to_string()))
2814 })
2815 }
2816}
2817
2818#[cfg(test)]
2819mod tests {
2820 use moire::sync::mpsc;
2821 use vox_types::{
2822 Backing, Conduit, ConnectionAccept, ConnectionReject, HandshakeResult, SelfRef,
2823 };
2824
2825 use super::*;
2826
2827 fn make_session() -> Session {
2828 let (a, b) = crate::memory_link_pair(32);
2829 std::mem::forget(b);
2831 let conduit = crate::BareConduit::new(a);
2832 let (tx, rx) = conduit.split();
2833 let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open.test", 4);
2834 let (_close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close.test", 4);
2835 let (_resume_tx, resume_rx) = mpsc::channel::<ResumeRequest>("session.resume.test", 1);
2836 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control.test");
2837 Session::pre_handshake(
2838 tx, rx, None, open_rx, close_rx, resume_rx, control_tx, control_rx, None, false, None,
2839 None, None,
2840 )
2841 }
2842
2843 fn resumed_handshake(
2844 our_settings: ConnectionSettings,
2845 peer_settings: ConnectionSettings,
2846 ) -> HandshakeResult {
2847 HandshakeResult {
2848 role: SessionRole::Initiator,
2849 our_settings,
2850 peer_settings,
2851 peer_supports_retry: true,
2852 session_resume_key: Some(SessionResumeKey([7; 16])),
2853 peer_resume_key: None,
2854 our_schema: vec![],
2855 peer_schema: vec![],
2856 peer_metadata: vec![],
2857 }
2858 }
2859
2860 fn accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2861 SelfRef::owning(
2862 Backing::Boxed(Box::<[u8]>::default()),
2863 ConnectionAccept {
2864 connection_settings: ConnectionSettings {
2865 parity: Parity::Even,
2866 max_concurrent_requests: 64,
2867 initial_channel_credit: 16,
2868 },
2869 metadata: vec![],
2870 },
2871 )
2872 }
2873
2874 fn zero_credit_accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2875 SelfRef::owning(
2876 Backing::Boxed(Box::<[u8]>::default()),
2877 ConnectionAccept {
2878 connection_settings: ConnectionSettings {
2879 parity: Parity::Even,
2880 max_concurrent_requests: 64,
2881 initial_channel_credit: 0,
2882 },
2883 metadata: vec![],
2884 },
2885 )
2886 }
2887
2888 fn reject_ref() -> SelfRef<ConnectionReject<'static>> {
2889 SelfRef::owning(
2890 Backing::Boxed(Box::<[u8]>::default()),
2891 ConnectionReject { metadata: vec![] },
2892 )
2893 }
2894
2895 #[tokio::test]
2896 async fn duplicate_connection_accept_is_ignored_after_first() {
2897 let mut session = make_session();
2898 let conn_id = ConnectionId(1);
2899 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2900
2901 session.conns.insert(
2902 conn_id,
2903 ConnectionSlot::PendingOutbound(PendingOutboundData {
2904 local_settings: ConnectionSettings {
2905 parity: Parity::Odd,
2906 max_concurrent_requests: 64,
2907 initial_channel_credit: 16,
2908 },
2909 result_tx: Some(result_tx),
2910 }),
2911 );
2912
2913 session.handle_inbound_accept(conn_id, accept_ref());
2914 let handle = result_rx
2915 .await
2916 .expect("pending outbound result should resolve")
2917 .expect("accept should resolve as Ok");
2918 assert_eq!(handle.connection_id(), conn_id);
2919
2920 session.handle_inbound_accept(conn_id, accept_ref());
2921 assert!(
2922 matches!(
2923 session.conns.get(&conn_id),
2924 Some(ConnectionSlot::Active(ConnectionState { id, .. })) if *id == conn_id
2925 ),
2926 "duplicate accept should keep existing active connection state"
2927 );
2928 }
2929
2930 #[tokio::test]
2931 async fn duplicate_connection_reject_is_ignored_after_first() {
2932 let mut session = make_session();
2933 let conn_id = ConnectionId(1);
2934 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2935
2936 session.conns.insert(
2937 conn_id,
2938 ConnectionSlot::PendingOutbound(PendingOutboundData {
2939 local_settings: ConnectionSettings {
2940 parity: Parity::Odd,
2941 max_concurrent_requests: 64,
2942 initial_channel_credit: 16,
2943 },
2944 result_tx: Some(result_tx),
2945 }),
2946 );
2947
2948 session.handle_inbound_reject(conn_id, reject_ref());
2949 let result = result_rx
2950 .await
2951 .expect("pending outbound result should resolve");
2952 assert!(
2953 matches!(result, Err(SessionError::Rejected(_))),
2954 "expected rejection, got: {result:?}"
2955 );
2956
2957 session.handle_inbound_reject(conn_id, reject_ref());
2958 assert!(
2959 !session.conns.contains_key(&conn_id),
2960 "duplicate reject should not recreate connection state"
2961 );
2962 }
2963
2964 #[tokio::test]
2966 async fn inbound_accept_with_zero_initial_credit_rejects_pending_open() {
2967 let mut session = make_session();
2968 let conn_id = ConnectionId(1);
2969 let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2970
2971 session.conns.insert(
2972 conn_id,
2973 ConnectionSlot::PendingOutbound(PendingOutboundData {
2974 local_settings: ConnectionSettings {
2975 parity: Parity::Odd,
2976 max_concurrent_requests: 64,
2977 initial_channel_credit: 16,
2978 },
2979 result_tx: Some(result_tx),
2980 }),
2981 );
2982
2983 session.handle_inbound_accept(conn_id, zero_credit_accept_ref());
2984 let result = result_rx
2985 .await
2986 .expect("pending outbound result should resolve");
2987 assert!(
2988 matches!(
2989 result,
2990 Err(SessionError::Protocol(ref message))
2991 if message == "initial_channel_credit must be greater than zero"
2992 ),
2993 "expected zero-credit protocol error, got: {result:?}"
2994 );
2995 assert!(
2996 !session.conns.contains_key(&conn_id),
2997 "zero-credit accept should not create an active connection"
2998 );
2999 }
3000
3001 #[test]
3002 fn out_of_order_accept_or_reject_without_pending_is_ignored() {
3003 let mut session = make_session();
3004 let conn_id = ConnectionId(99);
3005
3006 session.handle_inbound_accept(conn_id, accept_ref());
3007 session.handle_inbound_reject(conn_id, reject_ref());
3008
3009 assert!(
3010 session.conns.is_empty(),
3011 "out-of-order accept/reject should not mutate empty connection table"
3012 );
3013 }
3014
3015 #[tokio::test]
3016 async fn close_request_clears_pending_outbound_open() {
3017 let mut session = make_session();
3018 let (open_result_tx, open_result_rx) = moire::sync::oneshot::channel("session.open.result");
3019 let (close_result_tx, close_result_rx) =
3020 moire::sync::oneshot::channel("session.close.result");
3021
3022 session.conns.insert(
3023 ConnectionId(1),
3024 ConnectionSlot::PendingOutbound(PendingOutboundData {
3025 local_settings: ConnectionSettings {
3026 parity: Parity::Odd,
3027 max_concurrent_requests: 64,
3028 initial_channel_credit: 16,
3029 },
3030 result_tx: Some(open_result_tx),
3031 }),
3032 );
3033
3034 session
3035 .handle_close_request(CloseRequest {
3036 conn_id: ConnectionId(1),
3037 metadata: vec![],
3038 result_tx: close_result_tx,
3039 })
3040 .await;
3041
3042 let close_result = close_result_rx
3043 .await
3044 .expect("close result should be delivered");
3045 assert!(
3046 close_result.is_ok(),
3047 "close should succeed for pending outbound connection"
3048 );
3049
3050 assert!(
3051 open_result_rx.await.is_err(),
3052 "pending open result channel should be closed once the pending slot is removed"
3053 );
3054 }
3055
3056 #[test]
3057 fn resume_rejects_changed_local_root_settings() {
3058 let mut session = make_session();
3059 let local_settings = ConnectionSettings {
3060 parity: Parity::Odd,
3061 max_concurrent_requests: 64,
3062 initial_channel_credit: 16,
3063 };
3064 let peer_settings = ConnectionSettings {
3065 parity: Parity::Even,
3066 max_concurrent_requests: 64,
3067 initial_channel_credit: 16,
3068 };
3069 let _root = session
3070 .establish_from_handshake(resumed_handshake(
3071 local_settings.clone(),
3072 peer_settings.clone(),
3073 ))
3074 .expect("initial handshake should establish session");
3075
3076 let (link_a, _link_b) = crate::memory_link_pair(32);
3077 let conduit = crate::BareConduit::new(link_a);
3078 let (tx, rx) = conduit.split();
3079
3080 let result = session.resume_from_handshake(
3081 Arc::new(tx),
3082 Box::new(rx),
3083 resumed_handshake(
3084 ConnectionSettings {
3085 parity: Parity::Odd,
3086 max_concurrent_requests: 65,
3087 initial_channel_credit: 16,
3088 },
3089 peer_settings,
3090 ),
3091 );
3092
3093 assert!(
3094 matches!(
3095 &result,
3096 Err(SessionError::Protocol(message))
3097 if message == "local root settings changed across session resume"
3098 ),
3099 "expected local-root-settings mismatch, got: {result:?}"
3100 );
3101 }
3102
3103 #[test]
3104 fn resume_rejects_changed_peer_root_settings() {
3105 let mut session = make_session();
3106 let local_settings = ConnectionSettings {
3107 parity: Parity::Odd,
3108 max_concurrent_requests: 64,
3109 initial_channel_credit: 16,
3110 };
3111 let peer_settings = ConnectionSettings {
3112 parity: Parity::Even,
3113 max_concurrent_requests: 64,
3114 initial_channel_credit: 16,
3115 };
3116 let _root = session
3117 .establish_from_handshake(resumed_handshake(
3118 local_settings.clone(),
3119 peer_settings.clone(),
3120 ))
3121 .expect("initial handshake should establish session");
3122
3123 let (link_a, _link_b) = crate::memory_link_pair(32);
3124 let conduit = crate::BareConduit::new(link_a);
3125 let (tx, rx) = conduit.split();
3126
3127 let result = session.resume_from_handshake(
3128 Arc::new(tx),
3129 Box::new(rx),
3130 resumed_handshake(
3131 local_settings,
3132 ConnectionSettings {
3133 parity: Parity::Even,
3134 max_concurrent_requests: 65,
3135 initial_channel_credit: 16,
3136 },
3137 ),
3138 );
3139
3140 assert!(
3141 matches!(
3142 &result,
3143 Err(SessionError::Protocol(message))
3144 if message == "peer root settings changed across session resume"
3145 ),
3146 "expected peer-root-settings mismatch, got: {result:?}"
3147 );
3148 }
3149}