Skip to main content

vox_core/session/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6    time::Duration,
7};
8
9use facet_core::Shape;
10use moire::sync::mpsc;
11use tokio::sync::{mpsc as tokio_mpsc, oneshot as tokio_oneshot, watch};
12use tracing::{trace, warn};
13use vox_types::{
14    BoxFut, ChannelMessage, ConduitRx, ConduitTx, ConnectionAccept, ConnectionClose, ConnectionId,
15    ConnectionOpen, ConnectionReject, ConnectionSettings, Handler, HandshakeResult, IdAllocator,
16    MaybeSend, MaybeSync, Message, MessageFamily, MessagePayload, Metadata, Parity, RequestBody,
17    RequestId, RequestMessage, RequestResponse, SchemaMessage, SelfRef, SessionResumeKey,
18    SessionRole, TrySendError, VoxDebugSnapshot, VoxObserverHandle,
19};
20use vox_types::{
21    ConnectionCloseReason, ConnectionDebugSnapshot, ConnectionDebugState, DecodeErrorKind,
22    DriverTaskStatus,
23};
24
25mod builders;
26pub use builders::*;
27
28/// Session-level protocol keepalive configuration.
29#[derive(Debug, Clone, Copy)]
30pub struct SessionKeepaliveConfig {
31    pub ping_interval: Duration,
32    pub pong_timeout: Duration,
33}
34
35// ---------------------------------------------------------------------------
36// Connection acceptor trait
37// ---------------------------------------------------------------------------
38
39/// Metadata wrapper with typed getters for well-known `vox-*` keys.
40///
41/// Passed to [`ConnectionAcceptor::accept`] when a peer opens a connection.
42pub struct ConnectionRequest<'a> {
43    metadata: &'a [vox_types::MetadataEntry<'a>],
44    service: &'a str,
45}
46
47impl<'a> ConnectionRequest<'a> {
48    /// Build a connection request from metadata.
49    ///
50    /// Returns an error if the required `vox-service` metadata key is missing.
51    pub fn new(metadata: &'a [vox_types::MetadataEntry<'a>]) -> Result<Self, SessionError> {
52        let service = vox_types::metadata_get_str(metadata, "vox-service").ok_or_else(|| {
53            SessionError::Protocol("missing required vox-service metadata".into())
54        })?;
55        Ok(Self { metadata, service })
56    }
57
58    /// The requested service name (`vox-service` metadata key).
59    pub fn service(&self) -> &str {
60        self.service
61    }
62
63    /// The transport type (`vox-transport` metadata key).
64    pub fn transport(&self) -> Option<&str> {
65        vox_types::metadata_get_str(self.metadata, "vox-transport")
66    }
67
68    /// The peer address (`vox-peer-addr` metadata key).
69    pub fn peer_addr(&self) -> Option<&str> {
70        vox_types::metadata_get_str(self.metadata, "vox-peer-addr")
71    }
72
73    /// Whether this is a root or virtual connection.
74    pub fn is_root(&self) -> bool {
75        !self.is_virtual()
76    }
77
78    /// Whether this is a virtual connection.
79    pub fn is_virtual(&self) -> bool {
80        vox_types::metadata_get_str(self.metadata, "vox-connection-kind") == Some("virtual")
81    }
82
83    /// Look up a string value by key.
84    pub fn get_str(&self, key: &str) -> Option<&str> {
85        vox_types::metadata_get_str(self.metadata, key)
86    }
87
88    /// Look up a u64 value by key.
89    pub fn get_u64(&self, key: &str) -> Option<u64> {
90        vox_types::metadata_get_u64(self.metadata, key)
91    }
92
93    /// Access the raw metadata entries.
94    pub fn metadata(&self) -> &[vox_types::MetadataEntry<'a>] {
95        self.metadata
96    }
97}
98
99/// A connection that has been opened but not yet accepted.
100///
101/// The acceptor receives this and decides its fate by calling one of:
102/// - `handle_with(handler)` — run a Driver with this handler (common case)
103/// - `proxy_to(other_handle)` — pipe messages to/from another connection
104/// - `into_handle()` — take the raw ConnectionHandle for custom use
105pub struct PendingConnection {
106    handle: Option<ConnectionHandle>,
107    caller_slot: Option<Arc<std::sync::Mutex<Option<crate::Caller>>>>,
108    operation_store: Option<Arc<dyn crate::OperationStore>>,
109}
110
111impl PendingConnection {
112    fn new(handle: ConnectionHandle) -> Self {
113        Self {
114            handle: Some(handle),
115            caller_slot: None,
116            operation_store: None,
117        }
118    }
119
120    /// Create a PendingConnection that captures the Caller when handle_with is called.
121    fn with_caller_slot(
122        handle: ConnectionHandle,
123        caller_slot: Arc<std::sync::Mutex<Option<crate::Caller>>>,
124        operation_store: Option<Arc<dyn crate::OperationStore>>,
125    ) -> Self {
126        Self {
127            handle: Some(handle),
128            caller_slot: Some(caller_slot),
129            operation_store,
130        }
131    }
132
133    /// Accept this connection and run a Driver with the given handler.
134    pub fn handle_with(mut self, handler: impl Handler<crate::DriverReplySink> + 'static) {
135        let handle = self
136            .handle
137            .take()
138            .expect("PendingConnection already consumed");
139        let conn_id = handle.connection_id();
140        trace!(%conn_id, "PendingConnection::handle_with: creating driver");
141        let mut driver = match self.operation_store.take() {
142            Some(store) => crate::Driver::with_operation_store(handle, handler, store),
143            None => crate::Driver::new(handle, handler),
144        };
145        if let Some(slot) = &self.caller_slot {
146            let caller = crate::Caller::new(driver.caller());
147            *slot.lock().unwrap() = Some(caller);
148        }
149        #[cfg(not(target_arch = "wasm32"))]
150        tokio::spawn(async move {
151            trace!(%conn_id, "PendingConnection driver starting");
152            driver.run().await;
153            trace!(%conn_id, "PendingConnection driver exited");
154        });
155        #[cfg(target_arch = "wasm32")]
156        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
157    }
158
159    /// Accept this connection, run a Driver, and return a typed client for the peer.
160    pub fn handle_with_client<C: crate::FromVoxSession>(
161        mut self,
162        handler: impl Handler<crate::DriverReplySink> + 'static,
163    ) -> C {
164        let handle = self
165            .handle
166            .take()
167            .expect("PendingConnection already consumed");
168        let conn_id = handle.connection_id();
169        trace!(%conn_id, "PendingConnection::handle_with_client: creating driver");
170        let mut driver = match self.operation_store.take() {
171            Some(store) => crate::Driver::with_operation_store(handle, handler, store),
172            None => crate::Driver::new(handle, handler),
173        };
174        let caller = crate::Caller::new(driver.caller());
175        if let Some(slot) = &self.caller_slot {
176            *slot.lock().unwrap() = Some(caller.clone());
177        }
178        #[cfg(not(target_arch = "wasm32"))]
179        tokio::spawn(async move {
180            trace!(%conn_id, "PendingConnection driver starting");
181            driver.run().await;
182            trace!(%conn_id, "PendingConnection driver exited");
183        });
184        #[cfg(target_arch = "wasm32")]
185        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
186        C::from_vox_session(caller, None)
187    }
188
189    /// Accept this connection and proxy all traffic to/from another connection.
190    pub fn proxy_to(mut self, other: ConnectionHandle) {
191        let handle = self
192            .handle
193            .take()
194            .expect("PendingConnection already consumed");
195        #[cfg(not(target_arch = "wasm32"))]
196        tokio::spawn(async move {
197            let _ = proxy_connections(handle, other).await;
198        });
199        #[cfg(target_arch = "wasm32")]
200        wasm_bindgen_futures::spawn_local(async move {
201            let _ = proxy_connections(handle, other).await;
202        });
203    }
204
205    /// Take the raw ConnectionHandle for custom use.
206    pub fn into_handle(mut self) -> ConnectionHandle {
207        self.handle
208            .take()
209            .expect("PendingConnection already consumed")
210    }
211}
212
213impl Drop for PendingConnection {
214    fn drop(&mut self) {
215        if let Some(handle) = self.handle.take() {
216            let conn_id = handle.connection_id();
217            warn!(%conn_id, "PendingConnection dropped without being consumed — closing connection");
218            if let Some(tx) = handle.control_tx.as_ref() {
219                let _ = send_drop_control(tx, DropControlRequest::Close(conn_id));
220            }
221        }
222    }
223}
224
225// r[impl rpc.virtual-connection.accept]
226pub trait ConnectionAcceptor: MaybeSend + MaybeSync + 'static {
227    fn accept(
228        &self,
229        request: &ConnectionRequest,
230        connection: PendingConnection,
231    ) -> Result<(), Metadata<'static>>;
232}
233
234/// Any `Handler<DriverReplySink>` is automatically a `ConnectionAcceptor`.
235impl<H> ConnectionAcceptor for H
236where
237    H: Handler<crate::DriverReplySink> + Clone + MaybeSend + MaybeSync + 'static,
238{
239    fn accept(
240        &self,
241        _request: &ConnectionRequest,
242        connection: PendingConnection,
243    ) -> Result<(), Metadata<'static>> {
244        connection.handle_with(self.clone());
245        Ok(())
246    }
247}
248
249/// Wrapper that turns a closure into a `ConnectionAcceptor`.
250pub struct AcceptorFn<F>(pub F);
251
252impl<F> ConnectionAcceptor for AcceptorFn<F>
253where
254    F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
255        + MaybeSend
256        + MaybeSync
257        + 'static,
258{
259    fn accept(
260        &self,
261        request: &ConnectionRequest,
262        connection: PendingConnection,
263    ) -> Result<(), Metadata<'static>> {
264        (self.0)(request, connection)
265    }
266}
267
268/// Create a `ConnectionAcceptor` from a closure.
269pub fn acceptor_fn<F>(f: F) -> AcceptorFn<F>
270where
271    F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
272        + MaybeSend
273        + MaybeSync
274        + 'static,
275{
276    AcceptorFn(f)
277}
278
279// ---------------------------------------------------------------------------
280// Open/close request types (from SessionHandle → run loop)
281// ---------------------------------------------------------------------------
282
283struct OpenRequest {
284    settings: ConnectionSettings,
285    metadata: Metadata<'static>,
286    result_tx: moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>,
287}
288
289struct CloseRequest {
290    conn_id: ConnectionId,
291    metadata: Metadata<'static>,
292    result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
293}
294
295struct ResumeRequest {
296    tx: Arc<dyn DynConduitTx>,
297    rx: Box<dyn DynConduitRx>,
298    handshake_result: HandshakeResult,
299    result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
300}
301
302#[derive(Debug, Clone, Copy)]
303pub(crate) enum DropControlRequest {
304    Shutdown,
305    Close(ConnectionId),
306}
307
308#[derive(Clone, Copy, Debug)]
309pub(crate) enum FailureDisposition {
310    Cancelled,
311    Indeterminate,
312}
313
314#[cfg(not(target_arch = "wasm32"))]
315fn send_drop_control(
316    tx: &mpsc::UnboundedSender<DropControlRequest>,
317    req: DropControlRequest,
318) -> Result<(), ()> {
319    tx.send(req).map_err(|_| ())
320}
321
322#[cfg(target_arch = "wasm32")]
323fn send_drop_control(
324    tx: &mpsc::UnboundedSender<DropControlRequest>,
325    req: DropControlRequest,
326) -> Result<(), ()> {
327    tx.try_send(req).map_err(|_| ())
328}
329
330// ---------------------------------------------------------------------------
331// SessionHandle — cloneable handle for opening/closing virtual connections
332// ---------------------------------------------------------------------------
333
334/// Cloneable handle for opening and closing virtual connections.
335///
336/// Returned by the session builder alongside the `Session` and root
337/// `ConnectionHandle`. The session's `run()` loop must be running
338/// concurrently for requests to be processed.
339// r[impl rpc.virtual-connection.open]
340#[derive(Clone)]
341pub struct SessionHandle {
342    open_tx: mpsc::Sender<OpenRequest>,
343    close_tx: mpsc::Sender<CloseRequest>,
344    resume_tx: mpsc::Sender<ResumeRequest>,
345    control_tx: mpsc::UnboundedSender<DropControlRequest>,
346    resume_key: Option<SessionResumeKey>,
347}
348
349impl SessionHandle {
350    /// Open a typed virtual connection on the session.
351    ///
352    /// Sends `vox-service` metadata automatically from the client's
353    /// `SERVICE_NAME`. Creates a `Driver` and spawns it, returning
354    /// a ready-to-use typed client.
355    pub async fn open<Client: crate::FromVoxSession>(
356        &self,
357        settings: ConnectionSettings,
358    ) -> Result<Client, SessionError> {
359        use crate::{Caller, Driver};
360        use vox_types::{MetadataEntry, MetadataFlags, MetadataValue};
361
362        let metadata: Metadata<'static> = vec![MetadataEntry {
363            key: crate::session::builders::VOX_SERVICE_METADATA_KEY.into(),
364            value: MetadataValue::String(Client::SERVICE_NAME.into()),
365            flags: MetadataFlags::NONE,
366        }];
367        let handle = self.open_connection(settings, metadata).await?;
368        let mut driver = Driver::new(handle, ());
369        let caller = Caller::new(driver.caller());
370        #[cfg(not(target_arch = "wasm32"))]
371        tokio::spawn(async move { driver.run().await });
372        #[cfg(target_arch = "wasm32")]
373        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
374        Ok(Client::from_vox_session(caller, None))
375    }
376
377    /// Open a new virtual connection on the session.
378    ///
379    /// Allocates a connection ID, sends `ConnectionOpen` to the peer, and
380    /// waits for `ConnectionAccept` or `ConnectionReject`. The session's
381    /// `run()` loop processes the response and completes the returned future.
382    // r[impl connection.open]
383    pub async fn open_connection(
384        &self,
385        settings: ConnectionSettings,
386        metadata: Metadata<'static>,
387    ) -> Result<ConnectionHandle, SessionError> {
388        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.open_result");
389        self.open_tx
390            .send(OpenRequest {
391                settings,
392                metadata,
393                result_tx,
394            })
395            .await
396            .map_err(|_| SessionError::Protocol("session closed".into()))?;
397        result_rx
398            .await
399            .map_err(|_| SessionError::Protocol("session closed".into()))?
400    }
401
402    /// Close a virtual connection.
403    ///
404    /// Sends `ConnectionClose` to the peer and removes the connection slot.
405    /// After this returns, no further messages will be routed to the
406    /// connection's driver.
407    // r[impl connection.close]
408    pub async fn close_connection(
409        &self,
410        conn_id: ConnectionId,
411        metadata: Metadata<'static>,
412    ) -> Result<(), SessionError> {
413        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.close_result");
414        self.close_tx
415            .send(CloseRequest {
416                conn_id,
417                metadata,
418                result_tx,
419            })
420            .await
421            .map_err(|_| SessionError::Protocol("session closed".into()))?;
422        result_rx
423            .await
424            .map_err(|_| SessionError::Protocol("session closed".into()))?
425    }
426
427    pub(crate) async fn resume_parts(
428        &self,
429        tx: Arc<dyn DynConduitTx>,
430        rx: Box<dyn DynConduitRx>,
431        handshake_result: HandshakeResult,
432    ) -> Result<(), SessionError> {
433        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.resume_result");
434        self.resume_tx
435            .send(ResumeRequest {
436                tx,
437                rx,
438                handshake_result,
439                result_tx,
440            })
441            .await
442            .map_err(|_| SessionError::Protocol("session closed".into()))?;
443        result_rx
444            .await
445            .map_err(|_| SessionError::Protocol("session closed".into()))?
446    }
447
448    /// Returns the session resume key, if the session is resumable.
449    pub fn resume_key(&self) -> Option<&SessionResumeKey> {
450        self.resume_key.as_ref()
451    }
452
453    /// Request shutdown of the entire session (root + all virtual connections).
454    pub fn shutdown(&self) -> Result<(), SessionError> {
455        send_drop_control(&self.control_tx, DropControlRequest::Shutdown)
456            .map_err(|_| SessionError::Protocol("session closed".into()))
457    }
458}
459
460// ---------------------------------------------------------------------------
461// Session
462// ---------------------------------------------------------------------------
463
464/// Session state machine.
465// r[impl session]
466// r[impl rpc.one-service-per-connection]
467pub struct Session {
468    /// Conduit receiver
469    rx: Box<dyn DynConduitRx>,
470
471    // r[impl session.role]
472    role: SessionRole,
473
474    /// Our local parity — determines which connection IDs we allocate.
475    // r[impl session.parity]
476    parity: Parity,
477
478    /// Shared core (for sending) — also held by all ConnectionSenders.
479    sess_core: Arc<SessionCore>,
480    peer_supports_retry: bool,
481    local_root_settings: ConnectionSettings,
482    peer_root_settings: Option<ConnectionSettings>,
483    resumable: bool,
484    session_resume_key: Option<SessionResumeKey>,
485
486    /// Connection state (active, pending inbound, pending outbound).
487    conns: BTreeMap<ConnectionId, ConnectionSlot>,
488    /// Whether the root connection was internally closed because all root callers dropped.
489    root_closed_internal: bool,
490
491    /// Allocator for outbound virtual connection IDs (uses session parity).
492    conn_ids: IdAllocator<ConnectionId>,
493
494    /// Callback for accepting inbound virtual connections.
495    on_connection: Option<Arc<dyn ConnectionAcceptor>>,
496
497    /// Receiver for open requests from SessionHandle.
498    open_rx: mpsc::Receiver<OpenRequest>,
499
500    /// Receiver for close requests from SessionHandle.
501    close_rx: mpsc::Receiver<CloseRequest>,
502
503    /// Receiver for resume requests from SessionHandle.
504    resume_rx: mpsc::Receiver<ResumeRequest>,
505
506    /// Sender/receiver for drop-driven session/connection control requests.
507    control_tx: mpsc::UnboundedSender<DropControlRequest>,
508    control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
509
510    /// Optional proactive keepalive runtime config for connection ID 0.
511    keepalive: Option<SessionKeepaliveConfig>,
512    resume_notifier: watch::Sender<u64>,
513    recoverer: Option<Box<dyn ConduitRecoverer>>,
514    recovery_timeout: Option<Duration>,
515    /// Whether this session was registered in a `SessionRegistry`, meaning
516    /// an external acceptor could route a reconnecting client to resume it.
517    registered_in_registry: bool,
518
519    observer: Option<VoxObserverHandle>,
520}
521
522#[derive(Debug)]
523struct KeepaliveRuntime {
524    ping_interval: Duration,
525    pong_timeout: Duration,
526    next_ping_at: vox_types::time::tokio::Instant,
527    waiting_pong_nonce: Option<u64>,
528    pong_deadline: vox_types::time::tokio::Instant,
529    next_ping_nonce: u64,
530}
531
532// r[impl connection]
533/// Static data for one active connection.
534#[derive(Debug)]
535pub struct ConnectionState {
536    /// Unique connection identifier
537    pub id: ConnectionId,
538
539    /// Our settings
540    pub local_settings: ConnectionSettings,
541
542    /// The peer's settings
543    pub peer_settings: ConnectionSettings,
544
545    /// Sender for routing incoming messages to the per-connection driver task.
546    conn_tx: mpsc::Sender<RecvMessage>,
547    closed_tx: watch::Sender<Option<ConnectionCloseReason>>,
548
549    /// Per-connection schema recv tracker — schemas are scoped to a connection.
550    schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
551}
552
553#[derive(Debug)]
554enum ConnectionSlot {
555    Active(ConnectionState),
556    PendingOutbound(PendingOutboundData),
557}
558
559/// Debug-printable wrapper that omits the oneshot sender.
560struct PendingOutboundData {
561    local_settings: ConnectionSettings,
562    result_tx: Option<moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>>,
563}
564
565impl std::fmt::Debug for PendingOutboundData {
566    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
567        f.debug_struct("PendingOutbound")
568            .field("local_settings", &self.local_settings)
569            .finish()
570    }
571}
572
573#[derive(Clone)]
574pub(crate) struct ConnectionSender {
575    connection_id: ConnectionId,
576    pub(crate) sess_core: Arc<SessionCore>,
577    failures: Arc<mpsc::UnboundedSender<(RequestId, FailureDisposition)>>,
578}
579
580fn forwarded_payload<'a>(payload: &'a vox_types::Payload<'a>) -> vox_types::Payload<'a> {
581    let vox_types::Payload::PostcardBytes(bytes) = payload else {
582        unreachable!("proxy forwarding expects decoded incoming payload bytes")
583    };
584    vox_types::Payload::PostcardBytes(bytes)
585}
586
587fn forwarded_request_body<'a>(body: &'a RequestBody<'a>) -> RequestBody<'a> {
588    match body {
589        RequestBody::Call(call) => RequestBody::Call(vox_types::RequestCall {
590            method_id: call.method_id,
591            metadata: call.metadata.clone(),
592            args: forwarded_payload(&call.args),
593            schemas: call.schemas.clone(),
594        }),
595        RequestBody::Response(response) => RequestBody::Response(RequestResponse {
596            metadata: response.metadata.clone(),
597            ret: forwarded_payload(&response.ret),
598            schemas: response.schemas.clone(),
599        }),
600        RequestBody::Cancel(cancel) => RequestBody::Cancel(vox_types::RequestCancel {
601            metadata: cancel.metadata.clone(),
602        }),
603    }
604}
605
606fn forwarded_channel_body<'a>(body: &'a vox_types::ChannelBody<'a>) -> vox_types::ChannelBody<'a> {
607    match body {
608        vox_types::ChannelBody::Item(item) => {
609            vox_types::ChannelBody::Item(vox_types::ChannelItem {
610                item: forwarded_payload(&item.item),
611            })
612        }
613        vox_types::ChannelBody::Close(close) => {
614            vox_types::ChannelBody::Close(vox_types::ChannelClose {
615                metadata: close.metadata.clone(),
616            })
617        }
618        vox_types::ChannelBody::Reset(reset) => {
619            vox_types::ChannelBody::Reset(vox_types::ChannelReset {
620                metadata: reset.metadata.clone(),
621            })
622        }
623        vox_types::ChannelBody::GrantCredit(credit) => {
624            vox_types::ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
625                additional: credit.additional,
626            })
627        }
628    }
629}
630
631impl ConnectionSender {
632    pub(crate) fn connection_id(&self) -> ConnectionId {
633        self.connection_id
634    }
635
636    pub(crate) async fn send_with_binder<'a>(
637        &self,
638        msg: ConnectionMessage<'a>,
639        binder: Option<&'a dyn vox_types::ChannelBinder>,
640    ) -> Result<(), ()> {
641        let payload = match msg {
642            ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
643            ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
644        };
645        let message = Message {
646            connection_id: self.connection_id,
647            payload,
648        };
649        self.sess_core
650            .send(message, binder, None)
651            .await
652            .map_err(|_| ())
653    }
654
655    /// Send an arbitrary connection message
656    pub async fn send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), ()> {
657        self.send_with_binder(msg, None).await
658    }
659
660    // r[impl rpc.flow-control.credit.try-send]
661    pub(crate) fn try_send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), TrySendError<()>> {
662        let payload = match msg {
663            ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
664            ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
665        };
666        self.sess_core.try_send(
667            Message {
668                connection_id: self.connection_id,
669                payload,
670            },
671            None,
672            None,
673        )
674    }
675
676    /// Send a received connection message without re-materializing payload values.
677    pub(crate) async fn send_owned(
678        &self,
679        schemas: Arc<vox_types::SchemaRecvTracker>,
680        msg: SelfRef<ConnectionMessage<'static>>,
681    ) -> Result<(), ()> {
682        let msg_ref = msg.get();
683        let payload = match msg_ref {
684            ConnectionMessage::Request(request) => MessagePayload::RequestMessage(RequestMessage {
685                id: request.id,
686                body: forwarded_request_body(&request.body),
687            }),
688            ConnectionMessage::Channel(channel) => MessagePayload::ChannelMessage(ChannelMessage {
689                id: channel.id,
690                body: forwarded_channel_body(&channel.body),
691            }),
692        };
693
694        self.sess_core
695            .send(
696                Message {
697                    connection_id: self.connection_id,
698                    payload,
699                },
700                None,
701                Some(&*schemas),
702            )
703            .await
704            .map_err(|_| ())
705    }
706
707    /// Send a response specifically
708    pub async fn send_response<'a>(
709        &self,
710        request_id: RequestId,
711        response: RequestResponse<'a>,
712    ) -> Result<(), ()> {
713        self.send(ConnectionMessage::Request(RequestMessage {
714            id: request_id,
715            body: RequestBody::Response(response),
716        }))
717        .await
718    }
719
720    /// Shape a response using an explicit method ID, then send it.
721    pub async fn send_response_for_method<'a>(
722        &self,
723        request_id: RequestId,
724        method_id: vox_types::MethodId,
725        mut response: RequestResponse<'a>,
726    ) -> Result<(), ()> {
727        self.prepare_response_for_method(request_id, method_id, &mut response);
728        self.send(ConnectionMessage::Request(RequestMessage {
729            id: request_id,
730            body: RequestBody::Response(response),
731        }))
732        .await
733    }
734
735    /// Shape a response using an explicit method ID without sending it yet.
736    pub(crate) fn prepare_response_for_method(
737        &self,
738        request_id: RequestId,
739        method_id: vox_types::MethodId,
740        response: &mut RequestResponse<'_>,
741    ) {
742        self.sess_core.prepare_response_for_method(
743            self.connection_id,
744            request_id,
745            method_id,
746            response,
747        );
748    }
749
750    /// Shape a response using an explicit canonical root type and schema source.
751    pub(crate) fn prepare_response_from_source(
752        &self,
753        request_id: RequestId,
754        method_id: vox_types::MethodId,
755        root_type: &vox_types::TypeRef,
756        source: &dyn vox_types::SchemaSource,
757        response: &mut RequestResponse<'_>,
758    ) {
759        self.sess_core.prepare_response_from_source(
760            self.connection_id,
761            request_id,
762            method_id,
763            root_type,
764            source,
765            response,
766        );
767    }
768
769    /// Mark a request as failed by removing any pending response slot.
770    /// Called when a send error occurs or no reply was sent.
771    pub fn mark_failure(&self, request_id: RequestId, disposition: FailureDisposition) {
772        let _ = self.failures.send((request_id, disposition));
773    }
774
775    /// Prepare schemas for a replay response from the running code's
776    /// static response shape — same source of truth fresh responses
777    /// use, with the same connection-scoped dedup.
778    pub fn prepare_replay_schemas(
779        &self,
780        request_id: RequestId,
781        method_id: vox_types::MethodId,
782        response_shape: &'static Shape,
783        response: &mut RequestResponse<'_>,
784    ) {
785        self.sess_core.prepare_response_from_shape(
786            self.connection_id,
787            request_id,
788            method_id,
789            response_shape,
790            response,
791        );
792    }
793}
794
795pub struct ConnectionHandle {
796    pub(crate) sender: ConnectionSender,
797    pub(crate) rx: mpsc::Receiver<RecvMessage>,
798    pub(crate) failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
799    pub(crate) control_tx: Option<mpsc::UnboundedSender<DropControlRequest>>,
800    pub(crate) closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
801    pub(crate) resumed_rx: watch::Receiver<u64>,
802    pub(crate) local_settings: ConnectionSettings,
803    pub(crate) peer_settings: ConnectionSettings,
804    /// The parity this side should use for allocating request/channel IDs.
805    pub parity: Parity,
806    pub(crate) peer_supports_retry: bool,
807    pub(crate) observer: Option<VoxObserverHandle>,
808}
809
810impl std::fmt::Debug for ConnectionHandle {
811    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812        f.debug_struct("ConnectionHandle")
813            .field("connection_id", &self.sender.connection_id)
814            .finish()
815    }
816}
817
818pub(crate) enum ConnectionMessage<'payload> {
819    Request(RequestMessage<'payload>),
820    Channel(ChannelMessage<'payload>),
821}
822
823vox_types::impl_reborrow!(ConnectionMessage);
824
825/// A message routed to a driver, carrying the `SchemaRecvTracker` that was
826/// current when the session received it. This ensures each message uses the
827/// correct tracker even across reconnections.
828pub(crate) struct RecvMessage {
829    pub schemas: Arc<vox_types::SchemaRecvTracker>,
830    pub msg: SelfRef<ConnectionMessage<'static>>,
831    /// Descriptors that arrived with this frame (`SCM_RIGHTS`). Threaded to
832    /// the typed-decode site; `()` off-Unix.
833    pub fds: vox_types::FrameFds,
834}
835
836impl ConnectionHandle {
837    /// Returns the connection ID for this handle.
838    pub fn connection_id(&self) -> ConnectionId {
839        self.sender.connection_id
840    }
841
842    /// Resolve when this connection closes.
843    pub async fn closed(&self) {
844        if self.closed_rx.borrow().is_some() {
845            return;
846        }
847        let mut rx = self.closed_rx.clone();
848        while rx.changed().await.is_ok() {
849            if rx.borrow().is_some() {
850                return;
851            }
852        }
853    }
854
855    /// Return whether this connection is still considered connected.
856    pub fn is_connected(&self) -> bool {
857        self.closed_rx.borrow().is_none()
858    }
859
860    pub fn close_reason(&self) -> Option<ConnectionCloseReason> {
861        *self.closed_rx.borrow()
862    }
863
864    pub fn peer_supports_retry(&self) -> bool {
865        self.peer_supports_retry
866    }
867
868    // r[impl rpc.debug.snapshot]
869    pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
870        let (outbound_queue_depth, outbound_queue_capacity) =
871            self.sender.sess_core.outbound_queue_stats();
872        VoxDebugSnapshot {
873            connections: vec![ConnectionDebugSnapshot {
874                connection_id: self.connection_id(),
875                endpoint: None,
876                surface: None,
877                component: None,
878                state: if self.closed_rx.borrow().is_some() {
879                    ConnectionDebugState::Closed
880                } else {
881                    ConnectionDebugState::Open
882                },
883                outstanding_requests: 0,
884                requests: Vec::new(),
885                open_channels: Vec::new(),
886                outbound_queue_depth: Some(outbound_queue_depth),
887                outbound_queue_capacity: Some(outbound_queue_capacity),
888                local_control_queue_depth: None,
889                local_control_queue_capacity: None,
890                last_inbound_message_at: None,
891                last_outbound_message_at: None,
892                last_progress_at: None,
893                close_reason: *self.closed_rx.borrow(),
894                driver_task_status: DriverTaskStatus::Unknown,
895            }],
896        }
897    }
898
899    pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
900        let snapshot = self.debug_snapshot();
901        tracing::info!(?snapshot, "vox debug snapshot");
902        snapshot
903    }
904}
905
906/// Forward all request/channel traffic between two connections.
907///
908/// This is a protocol-level bridge: it does not inspect service schemas or method IDs.
909/// It exits when either side closes or a forward send fails, then requests closure of
910/// both underlying connections.
911pub async fn proxy_connections(
912    left: ConnectionHandle,
913    right: ConnectionHandle,
914) -> Result<(), SessionError> {
915    if left.parity == right.parity {
916        return Err(SessionError::Protocol(
917            "proxy_connections requires opposite parities".into(),
918        ));
919    }
920    let left_conn_id = left.connection_id();
921    let right_conn_id = right.connection_id();
922    let ConnectionHandle {
923        sender: left_sender,
924        rx: mut left_rx,
925        failures_rx: _left_failures_rx,
926        control_tx: left_control_tx,
927        closed_rx: _left_closed_rx,
928        resumed_rx: _left_resumed_rx,
929        local_settings: _left_local_settings,
930        peer_settings: _left_peer_settings,
931        parity: _left_parity,
932        peer_supports_retry: _left_peer_supports_retry,
933        observer: _left_observer,
934    } = left;
935    let ConnectionHandle {
936        sender: right_sender,
937        rx: mut right_rx,
938        failures_rx: _right_failures_rx,
939        control_tx: right_control_tx,
940        closed_rx: _right_closed_rx,
941        resumed_rx: _right_resumed_rx,
942        local_settings: _right_local_settings,
943        peer_settings: _right_peer_settings,
944        parity: _right_parity,
945        peer_supports_retry: _right_peer_supports_retry,
946        observer: _right_observer,
947    } = right;
948
949    loop {
950        tokio::select! {
951            recv = left_rx.recv() => {
952                let Some(recv) = recv else {
953                    break;
954                };
955                if right_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
956                    break;
957                }
958            }
959            recv = right_rx.recv() => {
960                let Some(recv) = recv else {
961                    break;
962                };
963                if left_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
964                    break;
965                }
966            }
967        }
968    }
969
970    if let Some(tx) = left_control_tx.as_ref() {
971        let _ = send_drop_control(tx, DropControlRequest::Close(left_conn_id));
972    }
973    if let Some(tx) = right_control_tx.as_ref() {
974        let _ = send_drop_control(tx, DropControlRequest::Close(right_conn_id));
975    }
976    Ok(())
977}
978
979/// Errors that can occur during session establishment or operation.
980#[derive(Debug)]
981pub enum SessionError {
982    Io(std::io::Error),
983    Protocol(String),
984    Rejected(Metadata<'static>),
985    NotResumable,
986    ConnectTimeout,
987}
988
989impl SessionError {
990    /// Returns `true` if a retry of the same connection attempt may succeed.
991    ///
992    /// I/O errors and timeouts are transient — the remote might become available
993    /// shortly. Protocol errors and explicit rejections are permanent for this
994    /// peer address and will not resolve by retrying.
995    pub fn is_retryable(&self) -> bool {
996        matches!(
997            self,
998            Self::Io(_) | Self::ConnectTimeout | Self::NotResumable
999        )
1000    }
1001}
1002
1003impl std::fmt::Display for SessionError {
1004    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1005        match self {
1006            Self::Io(e) => write!(f, "io error: {e}"),
1007            Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
1008            Self::Rejected(_) => write!(f, "connection rejected"),
1009            Self::NotResumable => write!(f, "session is not resumable"),
1010            Self::ConnectTimeout => write!(f, "connect timeout"),
1011        }
1012    }
1013}
1014
1015impl std::error::Error for SessionError {}
1016
1017fn classify_session_recv_error(error: &std::io::Error) -> ConnectionCloseReason {
1018    let message = error.to_string();
1019    if message.contains("decode error") || message.contains("protocol") {
1020        ConnectionCloseReason::Protocol
1021    } else {
1022        ConnectionCloseReason::Transport
1023    }
1024}
1025
1026fn classify_decode_error(error: &std::io::Error) -> Option<DecodeErrorKind> {
1027    let message = error.to_string();
1028    if message.contains("decode error") {
1029        Some(DecodeErrorKind::Payload)
1030    } else {
1031        None
1032    }
1033}
1034
1035impl Session {
1036    // r[impl rpc.observability.session-errors]
1037    // r[impl rpc.observability.driver]
1038    fn observe_session_recv_error(&self, error: &std::io::Error) {
1039        let Some(observer) = &self.observer else {
1040            return;
1041        };
1042
1043        if let Some(kind) = classify_decode_error(error) {
1044            for conn_id in self.conns.iter().filter_map(|(conn_id, slot)| {
1045                matches!(slot, ConnectionSlot::Active(_)).then_some(*conn_id)
1046            }) {
1047                observer.driver_event(vox_types::DriverEvent::DecodeError {
1048                    connection_id: conn_id,
1049                    kind,
1050                });
1051            }
1052            return;
1053        }
1054
1055        observer.transport_event(vox_types::TransportEvent::Closed {
1056            connection_id: None,
1057            reason: classify_session_recv_error(error),
1058        });
1059    }
1060
1061    fn close_connection_for_protocol_error(
1062        &mut self,
1063        conn_id: ConnectionId,
1064        detail: impl std::fmt::Display,
1065    ) {
1066        warn!(%conn_id, "closing connection after protocol error: {detail}");
1067        self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Protocol);
1068        self.maybe_request_shutdown_after_root_closed();
1069    }
1070
1071    fn record_received_schema_cbor(
1072        &mut self,
1073        conn_id: ConnectionId,
1074        schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
1075        method_id: vox_types::MethodId,
1076        direction: vox_types::BindingDirection,
1077        schemas_cbor: &vox_types::CborPayload,
1078        context: &str,
1079    ) -> bool {
1080        let payload = match vox_types::SchemaPayload::from_cbor(&schemas_cbor.0) {
1081            Ok(payload) => payload,
1082            Err(error) => {
1083                self.close_connection_for_protocol_error(
1084                    conn_id,
1085                    format!("{context}: invalid schema CBOR: {error}"),
1086                );
1087                return false;
1088            }
1089        };
1090
1091        if let Err(error) = schema_recv_tracker.record_received(method_id, direction, payload) {
1092            self.close_connection_for_protocol_error(conn_id, format!("{context}: {error}"));
1093            return false;
1094        }
1095
1096        true
1097    }
1098
1099    #[allow(clippy::too_many_arguments)]
1100    fn pre_handshake<Tx, Rx>(
1101        tx: Tx,
1102        rx: Rx,
1103        on_connection: Option<Arc<dyn ConnectionAcceptor>>,
1104        open_rx: mpsc::Receiver<OpenRequest>,
1105        close_rx: mpsc::Receiver<CloseRequest>,
1106        resume_rx: mpsc::Receiver<ResumeRequest>,
1107        control_tx: mpsc::UnboundedSender<DropControlRequest>,
1108        control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
1109        keepalive: Option<SessionKeepaliveConfig>,
1110        resumable: bool,
1111        recoverer: Option<Box<dyn ConduitRecoverer>>,
1112        recovery_timeout: Option<Duration>,
1113        observer: Option<VoxObserverHandle>,
1114    ) -> Self
1115    where
1116        Tx: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
1117        Rx: ConduitRx<Msg = MessageFamily> + MaybeSend + 'static,
1118    {
1119        let (outbound_tx, outbound_rx) = tokio_mpsc::channel(256);
1120        let sess_core = Arc::new(SessionCore {
1121            inner: std::sync::Mutex::new(SessionCoreInner {
1122                tx: Arc::new(tx) as Arc<dyn DynConduitTx>,
1123                conns: HashMap::new(),
1124            }),
1125            outbound_tx,
1126            observer: observer.clone(),
1127        });
1128        spawn_outbound_worker(outbound_rx);
1129        let (resume_notifier, _resume_rx) = watch::channel(0_u64);
1130        Session {
1131            rx: Box::new(rx),
1132            role: SessionRole::Initiator, // overwritten in establish_as_*
1133            parity: Parity::Odd,          // overwritten in establish_as_*
1134            sess_core,
1135            peer_supports_retry: false,
1136            local_root_settings: ConnectionSettings {
1137                parity: Parity::Odd,
1138                max_concurrent_requests: 64,
1139                initial_channel_credit: 16,
1140            },
1141            peer_root_settings: None,
1142            resumable,
1143            session_resume_key: None,
1144            conns: BTreeMap::new(),
1145            root_closed_internal: false,
1146            conn_ids: IdAllocator::new(Parity::Odd), // overwritten in establish_as_*
1147            on_connection,
1148            open_rx,
1149            close_rx,
1150            resume_rx,
1151            control_tx,
1152            control_rx,
1153            keepalive,
1154            resume_notifier,
1155            recoverer,
1156            recovery_timeout,
1157            registered_in_registry: false,
1158            observer,
1159        }
1160    }
1161
1162    pub(crate) fn resume_key(&self) -> Option<SessionResumeKey> {
1163        self.session_resume_key
1164    }
1165
1166    // r[impl session.handshake]
1167    fn establish_from_handshake(
1168        &mut self,
1169        result: HandshakeResult,
1170    ) -> Result<ConnectionHandle, SessionError> {
1171        self.role = result.role;
1172        self.parity = result.our_settings.parity;
1173        self.conn_ids = IdAllocator::new(result.our_settings.parity);
1174        self.local_root_settings = result.our_settings.clone();
1175        self.peer_root_settings = Some(result.peer_settings.clone());
1176        self.peer_supports_retry = result.peer_supports_retry;
1177        self.session_resume_key = result.session_resume_key;
1178
1179        if self.resumable && self.session_resume_key.is_none() {
1180            return Err(SessionError::NotResumable);
1181        }
1182
1183        Ok(self.make_root_handle(result.our_settings, result.peer_settings))
1184    }
1185
1186    fn make_root_handle(
1187        &mut self,
1188        local_settings: ConnectionSettings,
1189        peer_settings: ConnectionSettings,
1190    ) -> ConnectionHandle {
1191        self.make_connection_handle(ConnectionId::ROOT, local_settings, peer_settings)
1192    }
1193
1194    fn make_connection_handle(
1195        &mut self,
1196        conn_id: ConnectionId,
1197        local_settings: ConnectionSettings,
1198        peer_settings: ConnectionSettings,
1199    ) -> ConnectionHandle {
1200        let label = format!("session.conn{}", conn_id.0);
1201        let (conn_tx, conn_rx) = mpsc::channel::<RecvMessage>(&label, 64);
1202        let (failures_tx, failures_rx) = mpsc::unbounded_channel(format!("{label}.failures"));
1203        let (closed_tx, closed_rx) = watch::channel(None);
1204        let resumed_rx = self.resume_notifier.subscribe();
1205
1206        let sender = ConnectionSender {
1207            connection_id: conn_id,
1208            sess_core: Arc::clone(&self.sess_core),
1209            failures: Arc::new(failures_tx),
1210        };
1211
1212        let parity = local_settings.parity;
1213        let handle_local_settings = local_settings.clone();
1214        let handle_peer_settings = peer_settings.clone();
1215        trace!(%conn_id, "make_connection_handle: inserting slot into conns");
1216        if let Some(observer) = &self.observer {
1217            observer.driver_event(vox_types::DriverEvent::ConnectionOpened {
1218                connection_id: conn_id,
1219            });
1220        }
1221        self.conns.insert(
1222            conn_id,
1223            ConnectionSlot::Active(ConnectionState {
1224                id: conn_id,
1225                local_settings,
1226                peer_settings,
1227                conn_tx,
1228                closed_tx,
1229                schema_recv_tracker: Arc::new(vox_types::SchemaRecvTracker::new()),
1230            }),
1231        );
1232
1233        ConnectionHandle {
1234            sender,
1235            rx: conn_rx,
1236            failures_rx,
1237            control_tx: Some(self.control_tx.clone()),
1238            closed_rx,
1239            resumed_rx,
1240            local_settings: handle_local_settings,
1241            peer_settings: handle_peer_settings,
1242            parity,
1243            peer_supports_retry: self.peer_supports_retry,
1244            observer: self.observer.clone(),
1245        }
1246    }
1247
1248    /// Run the session recv loop: read from the conduit, demux by connection
1249    /// ID, and route to the appropriate connection's driver. Also processes
1250    /// open/close requests from the SessionHandle.
1251    // r[impl zerocopy.framing.pipeline.incoming]
1252    pub async fn run(&mut self) {
1253        let mut keepalive_runtime = self.make_keepalive_runtime();
1254        let mut keepalive_tick = keepalive_runtime.as_ref().map(|_| {
1255            let mut interval = vox_types::time::tokio::interval(Duration::from_millis(10));
1256            interval.set_missed_tick_behavior(vox_types::time::tokio::MissedTickBehavior::Delay);
1257            interval
1258        });
1259
1260        loop {
1261            tokio::select! {
1262                // biased: ensure conduit EOF is processed before any resume
1263                // request. Without this, tokio's random branch selection can
1264                // pick resume_rx when BOTH branches are simultaneously ready
1265                // (fast client reconnect on Linux), causing the session to
1266                // reject a valid resume while still in CONNECTED state.
1267                biased;
1268
1269                msg = self.rx.recv_msg() => {
1270                    vox_types::dlog!("[session {:?}] recv_msg returned", self.role);
1271                    match msg {
1272                        Ok(Some(msg)) => {
1273                            // Capture the frame's descriptors before the next
1274                            // recv overwrites them; thread them with the msg.
1275                            let fds = self.rx.take_frame_fds();
1276                            self.handle_message(msg, fds, &mut keepalive_runtime).await;
1277                        }
1278                        Ok(None) => {
1279                            vox_types::dlog!("[session {:?}] recv loop: conduit returned EOF", self.role);
1280                            if !self.handle_conduit_break(&mut keepalive_runtime).await {
1281                                vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1282                                self.close_all_connections(ConnectionCloseReason::Remote);
1283                                break;
1284                            }
1285                        }
1286                        Err(error) => {
1287                            let close_reason = classify_session_recv_error(&error);
1288                            self.observe_session_recv_error(&error);
1289                            warn!(
1290                                role = ?self.role,
1291                                %error,
1292                                ?close_reason,
1293                                "session receive failed; closing connections if recovery is unavailable"
1294                            );
1295                            vox_types::dlog!("[session {:?}] recv loop: conduit recv error: {}", self.role, error);
1296                            if !self.handle_conduit_break(&mut keepalive_runtime).await {
1297                                vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1298                                self.close_all_connections(close_reason);
1299                                break;
1300                            }
1301                        }
1302                    }
1303                }
1304                Some(req) = self.open_rx.recv() => {
1305                    self.handle_open_request(req).await;
1306                }
1307                Some(req) = self.close_rx.recv() => {
1308                    self.handle_close_request(req).await;
1309                }
1310                Some(req) = self.resume_rx.recv() => {
1311                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1312                        "resume is only valid while the session is disconnected".into(),
1313                    )));
1314                }
1315                Some(req) = self.control_rx.recv() => {
1316                    if !self.handle_drop_control_request(req).await {
1317                        self.close_all_connections(ConnectionCloseReason::Local);
1318                        break;
1319                    }
1320                }
1321                _ = async {
1322                    if let Some(interval) = keepalive_tick.as_mut() {
1323                        interval.tick().await;
1324                    }
1325                }, if keepalive_tick.is_some() => {
1326                    if !self.handle_keepalive_tick(&mut keepalive_runtime).await {
1327                        self.close_all_connections(ConnectionCloseReason::Protocol);
1328                        break;
1329                    }
1330                }
1331            }
1332        }
1333
1334        // Drop all connection slots so per-connection drivers exit immediately.
1335        self.close_all_connections(ConnectionCloseReason::SessionShutdown);
1336        trace!("session recv loop exited");
1337    }
1338
1339    async fn handle_conduit_break(
1340        &mut self,
1341        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1342    ) -> bool {
1343        // Recovery strategy:
1344        // 1. If we have a recoverer (client-side auto-reconnect), use it.
1345        // 2. If we're registered in a SessionRegistry (server-side), wait
1346        //    for the registry to route a reconnecting client to us.
1347        // 3. Otherwise, the session is done.
1348
1349        if let Some(recoverer) = self.recoverer.as_mut() {
1350            let recovery_fut = recoverer.next_conduit(self.session_resume_key.as_ref());
1351            let recovery_result = match self.recovery_timeout {
1352                Some(timeout) => match vox_types::time::tokio::timeout(timeout, recovery_fut).await
1353                {
1354                    Ok(r) => r,
1355                    Err(_) => return false,
1356                },
1357                None => recovery_fut.await,
1358            };
1359            match recovery_result {
1360                Ok(recovered) => {
1361                    let result =
1362                        self.resume_from_handshake(recovered.tx, recovered.rx, recovered.handshake);
1363                    match result {
1364                        Ok(()) => {
1365                            let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1366                            let _ = self.resume_notifier.send(next_generation);
1367                            *keepalive_runtime = self.make_keepalive_runtime();
1368                            return true;
1369                        }
1370                        Err(_) => return false,
1371                    }
1372                }
1373                Err(_) => return false,
1374            }
1375        }
1376
1377        if !self.registered_in_registry {
1378            return false;
1379        }
1380
1381        loop {
1382            tokio::select! {
1383                Some(req) = self.resume_rx.recv() => {
1384                    let result =
1385                        self.resume_from_handshake(req.tx, req.rx, req.handshake_result);
1386                    let ok = result.is_ok();
1387                    let _ = req.result_tx.send(result);
1388                    if ok {
1389                        let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1390                        let _ = self.resume_notifier.send(next_generation);
1391                        *keepalive_runtime = self.make_keepalive_runtime();
1392                        return true;
1393                    }
1394                }
1395                Some(req) = self.control_rx.recv() => {
1396                    if !self.handle_drop_control_request(req).await {
1397                        return false;
1398                    }
1399                }
1400                Some(req) = self.open_rx.recv() => {
1401                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1402                        "session is disconnected; resume before opening connections".into(),
1403                    )));
1404                }
1405                Some(req) = self.close_rx.recv() => {
1406                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1407                        "session is disconnected; resume before closing connections".into(),
1408                    )));
1409                }
1410                else => return false,
1411            }
1412        }
1413    }
1414
1415    // r[impl session.handshake.resume]
1416    fn resume_from_handshake(
1417        &mut self,
1418        tx: Arc<dyn DynConduitTx>,
1419        rx: Box<dyn DynConduitRx>,
1420        result: HandshakeResult,
1421    ) -> Result<(), SessionError> {
1422        let Some(peer_settings) = self.peer_root_settings.clone() else {
1423            return Err(SessionError::Protocol("missing peer root settings".into()));
1424        };
1425
1426        if result.our_settings != self.local_root_settings {
1427            return Err(SessionError::Protocol(
1428                "local root settings changed across session resume".into(),
1429            ));
1430        }
1431
1432        if result.peer_settings != peer_settings {
1433            return Err(SessionError::Protocol(
1434                "peer root settings changed across session resume".into(),
1435            ));
1436        }
1437
1438        self.peer_supports_retry = result.peer_supports_retry;
1439        self.session_resume_key = result.session_resume_key.or(self.session_resume_key);
1440
1441        self.sess_core.replace_tx_and_reset_schemas(tx);
1442        self.rx = rx;
1443        // Reset the root connection's recv tracker on reconnection —
1444        // type IDs are per-connection and must not carry over.
1445        if let Some(ConnectionSlot::Active(state)) = self.conns.get_mut(&ConnectionId::ROOT) {
1446            state.schema_recv_tracker = Arc::new(vox_types::SchemaRecvTracker::new());
1447        }
1448        Ok(())
1449    }
1450
1451    async fn handle_message(
1452        &mut self,
1453        msg: SelfRef<Message<'static>>,
1454        fds: vox_types::FrameFds,
1455        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1456    ) {
1457        let msg_ref = msg.get();
1458        let conn_id = msg_ref.connection_id;
1459        match &msg_ref.payload {
1460            MessagePayload::Ping(ping) => {
1461                let _ = self
1462                    .sess_core
1463                    .send(
1464                        Message {
1465                            connection_id: conn_id,
1466                            payload: MessagePayload::Pong(vox_types::Pong { nonce: ping.nonce }),
1467                        },
1468                        None,
1469                        None,
1470                    )
1471                    .await;
1472                return;
1473            }
1474            MessagePayload::Pong(pong) => {
1475                if conn_id.is_root() {
1476                    self.handle_keepalive_pong(pong.nonce, keepalive_runtime);
1477                }
1478                return;
1479            }
1480            MessagePayload::SchemaMessage(schema_msg) => {
1481                let schema_recv_tracker = match self.conns.get(&conn_id) {
1482                    Some(ConnectionSlot::Active(state)) => Arc::clone(&state.schema_recv_tracker),
1483                    _ => return,
1484                };
1485                let _ = self.record_received_schema_cbor(
1486                    conn_id,
1487                    schema_recv_tracker,
1488                    schema_msg.method_id,
1489                    schema_msg.direction,
1490                    &schema_msg.schemas,
1491                    "standalone schema message",
1492                );
1493                return;
1494            }
1495            _ => {}
1496        }
1497        vox_types::selfref_match!(msg, payload {
1498            // r[impl connection.close.semantics]
1499            MessagePayload::ConnectionClose(_) => {
1500                if conn_id.is_root() {
1501                    warn!("received ConnectionClose for root connection");
1502                } else {
1503                    trace!(conn_id = conn_id.0, "received ConnectionClose for virtual connection");
1504                }
1505                // Remove the connection — dropping conn_tx causes the Driver's rx
1506                // to return None, which exits its run loop. All in-flight handlers
1507                // are dropped, triggering DriverReplySink::drop → Cancelled responses.
1508                self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Remote);
1509                self.maybe_request_shutdown_after_root_closed();
1510            }
1511            MessagePayload::ConnectionOpen(open) => {
1512                self.handle_inbound_open(conn_id, open).await;
1513            }
1514            MessagePayload::ConnectionAccept(accept) => {
1515                self.handle_inbound_accept(conn_id, accept);
1516            }
1517            MessagePayload::ConnectionReject(reject) => {
1518                self.handle_inbound_reject(conn_id, reject);
1519            }
1520            MessagePayload::RequestMessage(r) => {
1521                let r_ref = r.get();
1522                vox_types::dlog!(
1523                    "[session {:?}] recv request: conn={:?} req={:?} body={} method={:?}",
1524                    self.role,
1525                    conn_id,
1526                    r_ref.id,
1527                    match &r_ref.body {
1528                        RequestBody::Call(_) => "Call",
1529                        RequestBody::Response(_) => "Response",
1530                        RequestBody::Cancel(_) => "Cancel",
1531                    },
1532                    match &r_ref.body {
1533                        RequestBody::Call(call) => Some(call.method_id),
1534                        RequestBody::Response(_) | RequestBody::Cancel(_) => None,
1535                    }
1536                );
1537                // Record any inlined schemas from the incoming request before routing
1538                let response_had_schema_payload = matches!(&r_ref.body, RequestBody::Response(resp) if !resp.schemas.is_empty());
1539                {
1540                    let schemas_cbor = match &r_ref.body {
1541                        RequestBody::Call(call) => Some(&call.schemas),
1542                        RequestBody::Response(resp) => Some(&resp.schemas),
1543                        _ => None,
1544                    };
1545                    vox_types::dlog!(
1546                        "[schema] recv ({:?}): req={:?} body={} schemas_len={:?}",
1547                        self.role,
1548                        r_ref.id,
1549                    match &r_ref.body {
1550                            RequestBody::Call(_) => "Call",
1551                            RequestBody::Response(_) => "Response",
1552                            RequestBody::Cancel(_) => "Cancel",
1553                        },
1554                        schemas_cbor.map(|s| s.0.len())
1555                    );
1556                    let schema_recv_tracker = match self.conns.get(&conn_id) {
1557                        Some(ConnectionSlot::Active(state)) => {
1558                            Arc::clone(&state.schema_recv_tracker)
1559                        }
1560                        _ => return,
1561                    };
1562                    if let Some(schemas_cbor) = schemas_cbor
1563                        && !schemas_cbor.is_empty()
1564                    {
1565                        let (method_id, direction) = match &r_ref.body {
1566                            RequestBody::Call(call) => {
1567                                (call.method_id, vox_types::BindingDirection::Args)
1568                            }
1569                            RequestBody::Response(_) => {
1570                                let Some(method_id) =
1571                                    self.sess_core.take_outgoing_call_method(conn_id, r_ref.id)
1572                                else {
1573                                    self.close_connection_for_protocol_error(
1574                                        conn_id,
1575                                        format!(
1576                                            "response schemas for unknown inflight request {:?}",
1577                                            r_ref.id
1578                                        ),
1579                                    );
1580                                    return;
1581                                };
1582                                (method_id, vox_types::BindingDirection::Response)
1583                            }
1584                            RequestBody::Cancel(_) => unreachable!(),
1585                        };
1586                        if !self.record_received_schema_cbor(
1587                            conn_id,
1588                            schema_recv_tracker,
1589                            method_id,
1590                            direction,
1591                            schemas_cbor,
1592                            "inlined request schemas",
1593                        ) {
1594                            return;
1595                        }
1596                    }
1597                }
1598                if matches!(&r_ref.body, RequestBody::Response(_)) && !response_had_schema_payload {
1599                    let _ = self.sess_core.take_outgoing_call_method(conn_id, r_ref.id);
1600                }
1601                // Record incoming calls so SessionCore::send() can look up
1602                // the method_id when sending the response.
1603                if let RequestBody::Call(call) = &r_ref.body {
1604                    self.sess_core.record_incoming_call(conn_id, r_ref.id, call.method_id);
1605                }
1606                let state = match self.conns.get(&conn_id) {
1607                    Some(ConnectionSlot::Active(state)) => state,
1608                    _ => return,
1609                };
1610                let conn_tx = state.conn_tx.clone();
1611                let request_id = r_ref.id;
1612                let body_kind = match &r_ref.body {
1613                    RequestBody::Call(_) => "Call",
1614                    RequestBody::Response(_) => "Response",
1615                    RequestBody::Cancel(_) => "Cancel",
1616                };
1617                let recv_msg = RecvMessage {
1618                    schemas: Arc::clone(&state.schema_recv_tracker),
1619                    msg: r.map(ConnectionMessage::Request),
1620                    fds,
1621                };
1622                vox_types::dlog!(
1623                    "[session {:?}] dispatch request: conn={:?} req={:?} body={}",
1624                    self.role,
1625                    conn_id,
1626                    request_id,
1627                    body_kind
1628                );
1629                if conn_tx.send(recv_msg).await.is_err() {
1630                    self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1631                    self.maybe_request_shutdown_after_root_closed();
1632                }
1633            }
1634            MessagePayload::ChannelMessage(c) => {
1635                let state = match self.conns.get(&conn_id) {
1636                    Some(ConnectionSlot::Active(state)) => state,
1637                    _ => return,
1638                };
1639                let conn_tx = state.conn_tx.clone();
1640                let recv_msg = RecvMessage {
1641                    schemas: Arc::clone(&state.schema_recv_tracker),
1642                    msg: c.map(ConnectionMessage::Channel),
1643                    fds,
1644                };
1645                if conn_tx.send(recv_msg).await.is_err() {
1646                    self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1647                    self.maybe_request_shutdown_after_root_closed();
1648                }
1649            }
1650            // ProtocolError: not valid post-handshake, drop.
1651        })
1652    }
1653
1654    fn make_keepalive_runtime(&self) -> Option<KeepaliveRuntime> {
1655        let config = self.keepalive?;
1656        if config.ping_interval.is_zero() || config.pong_timeout.is_zero() {
1657            warn!("keepalive disabled due to non-positive interval/timeout");
1658            return None;
1659        }
1660        let now = vox_types::time::tokio::Instant::now();
1661        Some(KeepaliveRuntime {
1662            ping_interval: config.ping_interval,
1663            pong_timeout: config.pong_timeout,
1664            next_ping_at: now + config.ping_interval,
1665            waiting_pong_nonce: None,
1666            pong_deadline: now,
1667            next_ping_nonce: 1,
1668        })
1669    }
1670
1671    fn handle_keepalive_pong(&self, nonce: u64, keepalive_runtime: &mut Option<KeepaliveRuntime>) {
1672        let Some(runtime) = keepalive_runtime.as_mut() else {
1673            return;
1674        };
1675        if runtime.waiting_pong_nonce != Some(nonce) {
1676            return;
1677        }
1678        runtime.waiting_pong_nonce = None;
1679        runtime.next_ping_at = vox_types::time::tokio::Instant::now() + runtime.ping_interval;
1680    }
1681
1682    async fn handle_keepalive_tick(
1683        &mut self,
1684        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1685    ) -> bool {
1686        let Some(runtime) = keepalive_runtime.as_mut() else {
1687            return true;
1688        };
1689        let now = vox_types::time::tokio::Instant::now();
1690
1691        if let Some(waiting_nonce) = runtime.waiting_pong_nonce {
1692            if now >= runtime.pong_deadline {
1693                warn!(
1694                    nonce = waiting_nonce,
1695                    timeout_ms = runtime.pong_timeout.as_millis(),
1696                    "keepalive timeout waiting for pong"
1697                );
1698                return false;
1699            }
1700            return true;
1701        }
1702
1703        if now < runtime.next_ping_at {
1704            return true;
1705        }
1706
1707        let nonce = runtime.next_ping_nonce;
1708        if self
1709            .sess_core
1710            .send(
1711                Message {
1712                    connection_id: ConnectionId::ROOT,
1713                    payload: MessagePayload::Ping(vox_types::Ping { nonce }),
1714                },
1715                None,
1716                None,
1717            )
1718            .await
1719            .is_err()
1720        {
1721            warn!("failed to send keepalive ping");
1722            return false;
1723        }
1724
1725        runtime.waiting_pong_nonce = Some(nonce);
1726        runtime.pong_deadline = now + runtime.pong_timeout;
1727        runtime.next_ping_at = now + runtime.ping_interval;
1728        runtime.next_ping_nonce = runtime.next_ping_nonce.wrapping_add(1);
1729        true
1730    }
1731
1732    async fn handle_inbound_open(
1733        &mut self,
1734        conn_id: ConnectionId,
1735        open: SelfRef<ConnectionOpen<'static>>,
1736    ) {
1737        // Validate: connection ID must match peer's parity (opposite of ours).
1738        let peer_parity = self.parity.other();
1739        if !conn_id.has_parity(peer_parity) {
1740            // Protocol error: wrong parity. For now, just reject.
1741            let _ = self
1742                .sess_core
1743                .send(
1744                    Message {
1745                        connection_id: conn_id,
1746                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1747                            metadata: vec![],
1748                        }),
1749                    },
1750                    None,
1751                    None,
1752                )
1753                .await;
1754            return;
1755        }
1756
1757        // Validate: connection ID must not already be in use.
1758        if self.conns.contains_key(&conn_id) {
1759            // Protocol error: duplicate connection ID.
1760            let _ = self
1761                .sess_core
1762                .send(
1763                    Message {
1764                        connection_id: conn_id,
1765                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1766                            metadata: vec![],
1767                        }),
1768                    },
1769                    None,
1770                    None,
1771                )
1772                .await;
1773            return;
1774        }
1775
1776        // r[impl connection.open.rejection]
1777        // Call the acceptor callback. If none is registered, reject.
1778        if self.on_connection.is_none() {
1779            let _ = self
1780                .sess_core
1781                .send(
1782                    Message {
1783                        connection_id: conn_id,
1784                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1785                            metadata: vec![],
1786                        }),
1787                    },
1788                    None,
1789                    None,
1790                )
1791                .await;
1792            return;
1793        }
1794
1795        // Derive settings: opposite parity, same limits for now.
1796        let open = open.get();
1797        if open.connection_settings.initial_channel_credit == 0 {
1798            let _ = self
1799                .sess_core
1800                .send(
1801                    Message {
1802                        connection_id: conn_id,
1803                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1804                            metadata: vec![vox_types::MetadataEntry::str(
1805                                "error",
1806                                "initial_channel_credit must be greater than zero",
1807                            )],
1808                        }),
1809                    },
1810                    None,
1811                    None,
1812                )
1813                .await;
1814            return;
1815        }
1816
1817        let our_settings = ConnectionSettings {
1818            parity: open.connection_settings.parity.other(),
1819            max_concurrent_requests: open.connection_settings.max_concurrent_requests,
1820            initial_channel_credit: open.connection_settings.initial_channel_credit,
1821        };
1822
1823        // Create the connection handle and activate it.
1824        let handle = self.make_connection_handle(
1825            conn_id,
1826            our_settings.clone(),
1827            open.connection_settings.clone(),
1828        );
1829
1830        // Let the acceptor decide the connection's fate.
1831        let mut metadata: Vec<vox_types::MetadataEntry<'_>> = open.metadata.to_vec();
1832        metadata.push(vox_types::MetadataEntry::str(
1833            "vox-connection-kind",
1834            "virtual",
1835        ));
1836        let request = match ConnectionRequest::new(&metadata) {
1837            Ok(r) => r,
1838            Err(e) => {
1839                trace!(%conn_id, %e, "rejecting virtual connection");
1840                self.conns.remove(&conn_id);
1841                let _ = self
1842                    .sess_core
1843                    .send(
1844                        Message {
1845                            connection_id: conn_id,
1846                            payload: MessagePayload::ConnectionReject(
1847                                vox_types::ConnectionReject {
1848                                    metadata: vec![vox_types::MetadataEntry::str(
1849                                        "error",
1850                                        e.to_string(),
1851                                    )],
1852                                },
1853                            ),
1854                        },
1855                        None,
1856                        None,
1857                    )
1858                    .await;
1859                return;
1860            }
1861        };
1862        let pending = PendingConnection::new(handle);
1863        let acceptor = self.on_connection.as_ref().unwrap();
1864        trace!(%conn_id, "calling acceptor for virtual connection");
1865        match acceptor.accept(&request, pending) {
1866            Ok(()) => {
1867                trace!(%conn_id, "acceptor accepted virtual connection, sending ConnectionAccept");
1868                let _ = self
1869                    .sess_core
1870                    .send(
1871                        Message {
1872                            connection_id: conn_id,
1873                            payload: MessagePayload::ConnectionAccept(
1874                                vox_types::ConnectionAccept {
1875                                    connection_settings: our_settings,
1876                                    metadata: vec![],
1877                                },
1878                            ),
1879                        },
1880                        None,
1881                        None,
1882                    )
1883                    .await;
1884            }
1885            Err(reject_metadata) => {
1886                // Clean up the connection slot we created.
1887                trace!(%conn_id, "acceptor rejected, removing conn slot");
1888                self.conns.remove(&conn_id);
1889                let _ = self
1890                    .sess_core
1891                    .send(
1892                        Message {
1893                            connection_id: conn_id,
1894                            payload: MessagePayload::ConnectionReject(
1895                                vox_types::ConnectionReject {
1896                                    metadata: reject_metadata,
1897                                },
1898                            ),
1899                        },
1900                        None,
1901                        None,
1902                    )
1903                    .await;
1904            }
1905        }
1906    }
1907
1908    fn handle_inbound_accept(
1909        &mut self,
1910        conn_id: ConnectionId,
1911        accept: SelfRef<ConnectionAccept<'static>>,
1912    ) {
1913        let accept = accept.get();
1914        let slot = self.remove_connection(&conn_id);
1915        match slot {
1916            Some(ConnectionSlot::PendingOutbound(mut pending))
1917                if accept.connection_settings.initial_channel_credit == 0 =>
1918            {
1919                if let Some(tx) = pending.result_tx.take() {
1920                    let _ = tx.send(Err(SessionError::Protocol(
1921                        "initial_channel_credit must be greater than zero".into(),
1922                    )));
1923                }
1924            }
1925            Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1926                let handle = self.make_connection_handle(
1927                    conn_id,
1928                    pending.local_settings.clone(),
1929                    accept.connection_settings.clone(),
1930                );
1931
1932                if let Some(tx) = pending.result_tx.take() {
1933                    let _ = tx.send(Ok(handle));
1934                }
1935            }
1936            Some(other) => {
1937                // Not pending outbound — put it back and ignore.
1938                self.conns.insert(conn_id, other);
1939            }
1940            None => {
1941                // No pending open for this ID — ignore.
1942            }
1943        }
1944    }
1945
1946    fn handle_inbound_reject(
1947        &mut self,
1948        conn_id: ConnectionId,
1949        reject: SelfRef<ConnectionReject<'static>>,
1950    ) {
1951        let reject = reject.get();
1952        let slot = self.remove_connection(&conn_id);
1953        match slot {
1954            Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1955                if let Some(tx) = pending.result_tx.take() {
1956                    let _ = tx.send(Err(SessionError::Rejected(vox_types::metadata_into_owned(
1957                        reject.metadata.to_vec(),
1958                    ))));
1959                }
1960            }
1961            Some(other) => {
1962                self.conns.insert(conn_id, other);
1963            }
1964            None => {}
1965        }
1966    }
1967
1968    // r[impl connection.open]
1969    async fn handle_open_request(&mut self, req: OpenRequest) {
1970        if req.settings.initial_channel_credit == 0 {
1971            let _ = req.result_tx.send(Err(SessionError::Protocol(
1972                "initial_channel_credit must be greater than zero".into(),
1973            )));
1974            return;
1975        }
1976
1977        let conn_id = self.conn_ids.alloc();
1978
1979        // Send ConnectionOpen to the peer.
1980        let send_result = self
1981            .sess_core
1982            .send(
1983                Message {
1984                    connection_id: conn_id,
1985                    payload: MessagePayload::ConnectionOpen(ConnectionOpen {
1986                        connection_settings: req.settings.clone(),
1987                        metadata: req.metadata,
1988                    }),
1989                },
1990                None,
1991                None,
1992            )
1993            .await;
1994
1995        if send_result.is_err() {
1996            let _ = req.result_tx.send(Err(SessionError::Protocol(
1997                "failed to send ConnectionOpen".into(),
1998            )));
1999            return;
2000        }
2001
2002        // Store the pending state. The run loop will complete the oneshot
2003        // when ConnectionAccept or ConnectionReject arrives.
2004        self.conns.insert(
2005            conn_id,
2006            ConnectionSlot::PendingOutbound(PendingOutboundData {
2007                local_settings: req.settings,
2008                result_tx: Some(req.result_tx),
2009            }),
2010        );
2011    }
2012
2013    // r[impl connection.close]
2014    async fn handle_close_request(&mut self, req: CloseRequest) {
2015        if req.conn_id.is_root() {
2016            let _ = req.result_tx.send(Err(SessionError::Protocol(
2017                "cannot close root connection".into(),
2018            )));
2019            return;
2020        }
2021
2022        // Remove the connection slot — this drops conn_tx and causes the
2023        // Driver to exit cleanly.
2024        if self
2025            .remove_connection_with_reason(&req.conn_id, ConnectionCloseReason::Local)
2026            .is_none()
2027        {
2028            let _ = req
2029                .result_tx
2030                .send(Err(SessionError::Protocol("connection not found".into())));
2031            return;
2032        }
2033
2034        // Send ConnectionClose to the peer.
2035        let send_result = self
2036            .sess_core
2037            .send(
2038                Message {
2039                    connection_id: req.conn_id,
2040                    payload: MessagePayload::ConnectionClose(ConnectionClose {
2041                        metadata: req.metadata,
2042                    }),
2043                },
2044                None,
2045                None,
2046            )
2047            .await;
2048
2049        if send_result.is_err() {
2050            let _ = req.result_tx.send(Err(SessionError::Protocol(
2051                "failed to send ConnectionClose".into(),
2052            )));
2053            return;
2054        }
2055
2056        let _ = req.result_tx.send(Ok(()));
2057        self.maybe_request_shutdown_after_root_closed();
2058    }
2059
2060    async fn handle_drop_control_request(&mut self, req: DropControlRequest) -> bool {
2061        match req {
2062            DropControlRequest::Shutdown => {
2063                trace!("session shutdown requested");
2064                false
2065            }
2066            DropControlRequest::Close(conn_id) => {
2067                // r[impl rpc.caller.liveness.last-drop-closes-connection]
2068                if conn_id.is_root() {
2069                    // r[impl rpc.caller.liveness.root-internal-close]
2070                    trace!("root callers dropped; internally closing root connection");
2071                    self.root_closed_internal = true;
2072                    // r[impl rpc.caller.liveness.root-teardown-condition]
2073                    return self.has_virtual_connections();
2074                }
2075
2076                if self
2077                    .remove_connection_with_reason(&conn_id, ConnectionCloseReason::Local)
2078                    .is_some()
2079                {
2080                    let _ = self
2081                        .sess_core
2082                        .send(
2083                            Message {
2084                                connection_id: conn_id,
2085                                payload: MessagePayload::ConnectionClose(ConnectionClose {
2086                                    metadata: vec![],
2087                                }),
2088                            },
2089                            None,
2090                            None,
2091                        )
2092                        .await;
2093                }
2094
2095                !self.root_closed_internal || self.has_virtual_connections()
2096            }
2097        }
2098    }
2099
2100    fn has_virtual_connections(&self) -> bool {
2101        self.conns.keys().any(|id| !id.is_root())
2102    }
2103
2104    fn remove_connection(&mut self, conn_id: &ConnectionId) -> Option<ConnectionSlot> {
2105        self.remove_connection_with_reason(conn_id, ConnectionCloseReason::Unknown)
2106    }
2107
2108    fn remove_connection_with_reason(
2109        &mut self,
2110        conn_id: &ConnectionId,
2111        reason: ConnectionCloseReason,
2112    ) -> Option<ConnectionSlot> {
2113        trace!(%conn_id, "remove_connection called");
2114        let slot = self.conns.remove(conn_id);
2115        if let Some(ConnectionSlot::Active(state)) = &slot {
2116            let _ = state.closed_tx.send(Some(reason));
2117            if let Some(observer) = &self.observer {
2118                observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2119                    connection_id: *conn_id,
2120                    reason,
2121                });
2122            }
2123        }
2124        slot
2125    }
2126
2127    // r[impl rpc.observability.session-errors]
2128    fn close_all_connections(&mut self, reason: ConnectionCloseReason) {
2129        trace!(role = ?self.role, count = self.conns.len(), "close_all_connections");
2130        vox_types::dlog!(
2131            "[session {:?}] close_all_connections: {} slots",
2132            self.role,
2133            self.conns.len()
2134        );
2135        for (conn_id, slot) in self.conns.iter() {
2136            if let ConnectionSlot::Active(state) = slot {
2137                vox_types::dlog!("[session {:?}] closing connection {:?}", self.role, conn_id);
2138                let _ = state.closed_tx.send(Some(reason));
2139                if let Some(observer) = &self.observer {
2140                    observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2141                        connection_id: *conn_id,
2142                        reason,
2143                    });
2144                }
2145            }
2146        }
2147        self.conns.clear();
2148    }
2149
2150    fn maybe_request_shutdown_after_root_closed(&self) {
2151        if self.root_closed_internal && !self.has_virtual_connections() {
2152            let _ = send_drop_control(&self.control_tx, DropControlRequest::Shutdown);
2153        }
2154    }
2155}
2156
2157pub(crate) struct SessionCore {
2158    inner: std::sync::Mutex<SessionCoreInner>,
2159    outbound_tx: tokio_mpsc::Sender<OutboundBatch>,
2160    observer: Option<VoxObserverHandle>,
2161}
2162
2163pub trait OutboundSendFuture: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2164impl<T> OutboundSendFuture for T where T: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2165
2166type OutboundSend = Pin<Box<dyn OutboundSendFuture>>;
2167
2168#[derive(Clone)]
2169struct PendingSchemaSend {
2170    method_id: vox_types::MethodId,
2171    direction: vox_types::BindingDirection,
2172    prepared: vox_types::PreparedSchemaPlan,
2173}
2174
2175struct OutboundBatch {
2176    conn_id: ConnectionId,
2177    conn_state: Arc<std::sync::Mutex<SendConnState>>,
2178    tx: Arc<dyn DynConduitTx>,
2179    schema_sends: Vec<PendingSchemaSend>,
2180    payload_send: OutboundSend,
2181    result_tx: tokio_oneshot::Sender<std::io::Result<()>>,
2182}
2183
2184async fn run_outbound_worker(mut rx: tokio_mpsc::Receiver<OutboundBatch>) {
2185    while let Some(batch) = rx.recv().await {
2186        let mut result = Ok(());
2187        for schema_send in batch.schema_sends {
2188            let schemas = {
2189                let mut conn_state = batch
2190                    .conn_state
2191                    .lock()
2192                    .expect("send conn state mutex poisoned");
2193                conn_state.send_tracker.preview_prepared_plan(
2194                    schema_send.method_id,
2195                    schema_send.direction,
2196                    &schema_send.prepared,
2197                )
2198            };
2199            if schemas.is_empty() {
2200                continue;
2201            }
2202
2203            let schema_msg = Message {
2204                connection_id: batch.conn_id,
2205                payload: MessagePayload::SchemaMessage(SchemaMessage {
2206                    method_id: schema_send.method_id,
2207                    direction: schema_send.direction,
2208                    schemas,
2209                }),
2210            };
2211            let send = match batch.tx.clone().prepare_msg(schema_msg, None) {
2212                Ok(send) => send,
2213                Err(error) => {
2214                    result = Err(error);
2215                    break;
2216                }
2217            };
2218            if let Err(error) = send.await {
2219                result = Err(error);
2220                break;
2221            }
2222            let mut conn_state = batch
2223                .conn_state
2224                .lock()
2225                .expect("send conn state mutex poisoned");
2226            conn_state.send_tracker.mark_prepared_plan_sent(
2227                schema_send.method_id,
2228                schema_send.direction,
2229                &schema_send.prepared,
2230            );
2231            conn_state
2232                .planned_bindings
2233                .remove(&(schema_send.direction, schema_send.method_id));
2234        }
2235        if result.is_ok()
2236            && let Err(error) = batch.payload_send.await
2237        {
2238            result = Err(error);
2239        }
2240        let _ = batch.result_tx.send(result);
2241    }
2242}
2243
2244#[cfg(not(target_arch = "wasm32"))]
2245fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2246    if tokio::runtime::Handle::try_current().is_ok() {
2247        tokio::spawn(run_outbound_worker(rx));
2248        return;
2249    }
2250
2251    std::thread::spawn(move || {
2252        let runtime = tokio::runtime::Builder::new_current_thread()
2253            .enable_all()
2254            .build()
2255            .expect("build outbound worker runtime");
2256        runtime.block_on(run_outbound_worker(rx));
2257    });
2258}
2259
2260#[cfg(target_arch = "wasm32")]
2261fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2262    wasm_bindgen_futures::spawn_local(run_outbound_worker(rx));
2263}
2264
2265struct SendConnState {
2266    /// Tracks which schemas we have sent on this connection.
2267    send_tracker: vox_types::SchemaSendTracker,
2268
2269    /// Maps request_id → method_id for in-flight incoming calls, so we can
2270    /// look up the method_id when sending the response.
2271    inflight_incoming: HashMap<RequestId, vox_types::MethodId>,
2272
2273    /// Maps request_id → method_id for outbound calls awaiting a response, so
2274    /// inbound response schema payloads can bind their root TypeRef.
2275    inflight_outgoing: HashMap<RequestId, vox_types::MethodId>,
2276
2277    /// Structured schema plans cached per binding until the first committed send.
2278    planned_bindings:
2279        HashMap<(vox_types::BindingDirection, vox_types::MethodId), vox_types::PreparedSchemaPlan>,
2280}
2281
2282impl SendConnState {
2283    fn new() -> Self {
2284        SendConnState {
2285            send_tracker: vox_types::SchemaSendTracker::new(),
2286            inflight_incoming: HashMap::new(),
2287            inflight_outgoing: HashMap::new(),
2288            planned_bindings: HashMap::new(),
2289        }
2290    }
2291}
2292
2293struct SessionCoreInner {
2294    /// Underlying conduit (tx end)
2295    tx: Arc<dyn DynConduitTx>,
2296
2297    /// Per-connection state re: sent schemas, etc.
2298    conns: HashMap<ConnectionId, Arc<std::sync::Mutex<SendConnState>>>,
2299}
2300
2301fn get_or_create_send_conn_state(
2302    inner: &mut SessionCoreInner,
2303    conn_id: ConnectionId,
2304) -> Arc<std::sync::Mutex<SendConnState>> {
2305    inner
2306        .conns
2307        .entry(conn_id)
2308        .or_insert_with(|| Arc::new(std::sync::Mutex::new(SendConnState::new())))
2309        .clone()
2310}
2311
2312impl SessionCore {
2313    pub(crate) fn outbound_queue_stats(&self) -> (usize, usize) {
2314        let capacity = self.outbound_tx.max_capacity();
2315        let available = self.outbound_tx.capacity();
2316        (capacity.saturating_sub(available), capacity)
2317    }
2318
2319    fn prepare_outbound_batch<'a>(
2320        &self,
2321        mut msg: Message<'a>,
2322        binder: Option<&'a dyn vox_types::ChannelBinder>,
2323        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2324    ) -> Result<(OutboundBatch, tokio_oneshot::Receiver<std::io::Result<()>>), ()> {
2325        let conn_id = msg.connection_id;
2326        let (tx, conn_state, schema_sends) = {
2327            let mut inner = self.inner.lock().expect("session core mutex poisoned");
2328            let tx = inner.tx.clone();
2329            let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2330            drop(inner);
2331
2332            if let MessagePayload::RequestMessage(req) = &mut msg.payload {
2333                vox_types::dlog!(
2334                    "[session-core] send request: conn={:?} req={:?} body={} forwarded={}",
2335                    conn_id,
2336                    req.id,
2337                    match &req.body {
2338                        RequestBody::Call(_) => "Call",
2339                        RequestBody::Response(_) => "Response",
2340                        RequestBody::Cancel(_) => "Cancel",
2341                    },
2342                    forwarded_schemas.is_some()
2343                );
2344                let schema_sends = {
2345                    let mut conn_state_guard =
2346                        conn_state.lock().expect("send conn state mutex poisoned");
2347                    let mut schema_sends = Vec::new();
2348                    match &mut req.body {
2349                        RequestBody::Call(call) => {
2350                            if let Some(schema_send) = Self::plan_call_schema_send(
2351                                &mut conn_state_guard,
2352                                req.id,
2353                                call.method_id,
2354                                call,
2355                                forwarded_schemas,
2356                            ) {
2357                                schema_sends.push(schema_send);
2358                            }
2359                            call.schemas = Default::default();
2360                        }
2361                        RequestBody::Response(resp) => {
2362                            if let Some(method_id) =
2363                                conn_state_guard.inflight_incoming.remove(&req.id)
2364                                && let Some(schema_send) = Self::plan_response_schema_send(
2365                                    &mut conn_state_guard,
2366                                    req.id,
2367                                    method_id,
2368                                    resp,
2369                                    forwarded_schemas,
2370                                )
2371                            {
2372                                schema_sends.push(schema_send);
2373                            }
2374                            resp.schemas = Default::default();
2375                        }
2376                        RequestBody::Cancel(_) => {}
2377                    }
2378                    schema_sends
2379                };
2380                (tx, conn_state, schema_sends)
2381            } else {
2382                (tx, conn_state, Vec::new())
2383            }
2384        };
2385        let payload_send = tx.clone().prepare_msg(msg, binder).map_err(|_| ())?;
2386
2387        let (result_tx, result_rx) = tokio_oneshot::channel();
2388        Ok((
2389            OutboundBatch {
2390                conn_id,
2391                conn_state,
2392                tx,
2393                schema_sends,
2394                payload_send,
2395                result_tx,
2396            },
2397            result_rx,
2398        ))
2399    }
2400
2401    // r[impl schema.principles.sender-driven]
2402    pub(crate) async fn send<'a>(
2403        &self,
2404        msg: Message<'a>,
2405        binder: Option<&'a dyn vox_types::ChannelBinder>,
2406        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2407    ) -> Result<(), ()> {
2408        let connection_id = msg.connection_id;
2409        let (batch, result_rx) = self.prepare_outbound_batch(msg, binder, forwarded_schemas)?;
2410        if self.outbound_tx.send(batch).await.is_err() {
2411            if let Some(observer) = &self.observer {
2412                observer
2413                    .driver_event(vox_types::DriverEvent::OutboundQueueClosed { connection_id });
2414            }
2415            return Err(());
2416        }
2417        let result = result_rx.await.map_err(|_| ());
2418        match result? {
2419            Ok(()) => Ok(()),
2420            Err(_) => {
2421                if let Some(observer) = &self.observer {
2422                    observer.driver_event(vox_types::DriverEvent::EncodeError {
2423                        connection_id,
2424                        kind: vox_types::EncodeErrorKind::Transport,
2425                    });
2426                }
2427                Err(())
2428            }
2429        }
2430    }
2431
2432    // r[impl rpc.flow-control.credit.try-send]
2433    pub(crate) fn try_send<'a>(
2434        &self,
2435        msg: Message<'a>,
2436        binder: Option<&'a dyn vox_types::ChannelBinder>,
2437        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2438    ) -> Result<(), TrySendError<()>> {
2439        let connection_id = msg.connection_id;
2440        let (batch, _result_rx) = self
2441            .prepare_outbound_batch(msg, binder, forwarded_schemas)
2442            .map_err(|_| TrySendError::Closed(()))?;
2443        self.outbound_tx.try_send(batch).map_err(|err| match err {
2444            tokio_mpsc::error::TrySendError::Full(_) => {
2445                if let Some(observer) = &self.observer {
2446                    observer
2447                        .driver_event(vox_types::DriverEvent::OutboundQueueFull { connection_id });
2448                }
2449                TrySendError::Full(())
2450            }
2451            tokio_mpsc::error::TrySendError::Closed(_) => {
2452                if let Some(observer) = &self.observer {
2453                    observer.driver_event(vox_types::DriverEvent::OutboundQueueClosed {
2454                        connection_id,
2455                    });
2456                }
2457                TrySendError::Closed(())
2458            }
2459        })
2460    }
2461
2462    /// Record that an incoming call was received, so we can look up the
2463    /// method_id when sending the response.
2464    pub(crate) fn record_incoming_call(
2465        &self,
2466        conn_id: ConnectionId,
2467        request_id: RequestId,
2468        method_id: vox_types::MethodId,
2469    ) {
2470        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2471        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2472        vox_types::dlog!(
2473            "[schema] record_incoming_call: conn={:?} req={:?} method={:?}",
2474            conn_id,
2475            request_id,
2476            method_id
2477        );
2478        conn_state
2479            .lock()
2480            .expect("send conn state mutex poisoned")
2481            .inflight_incoming
2482            .insert(request_id, method_id);
2483    }
2484
2485    pub(crate) fn take_outgoing_call_method(
2486        &self,
2487        conn_id: ConnectionId,
2488        request_id: RequestId,
2489    ) -> Option<vox_types::MethodId> {
2490        let inner = self.inner.lock().expect("session core mutex poisoned");
2491        inner.conns.get(&conn_id).and_then(|conn_state| {
2492            conn_state
2493                .lock()
2494                .expect("send conn state mutex poisoned")
2495                .inflight_outgoing
2496                .remove(&request_id)
2497        })
2498    }
2499
2500    pub(crate) fn prepare_response_for_method(
2501        &self,
2502        conn_id: ConnectionId,
2503        request_id: RequestId,
2504        method_id: vox_types::MethodId,
2505        response: &mut RequestResponse<'_>,
2506    ) {
2507        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2508        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2509        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2510        let key = (vox_types::BindingDirection::Response, method_id);
2511        if conn_state
2512            .send_tracker
2513            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2514        {
2515            response.schemas = Default::default();
2516            return;
2517        }
2518
2519        let prepared = match &response.ret {
2520            vox_types::Payload::Value { shape, .. } => {
2521                match Self::get_or_plan_binding_for_shape(
2522                    &mut conn_state,
2523                    key,
2524                    request_id,
2525                    "response",
2526                    shape,
2527                ) {
2528                    Some(prepared) => prepared,
2529                    None => return,
2530                }
2531            }
2532            vox_types::Payload::PostcardBytes(_) => {
2533                tracing::error!(
2534                    "schema attachment failed: missing forwarded response schemas for method {:?}",
2535                    method_id
2536                );
2537                return;
2538            }
2539        };
2540        response.schemas = prepared.to_cbor();
2541    }
2542
2543    /// Prepare response schemas from an explicit canonical root type and schema source.
2544    pub(crate) fn prepare_response_from_source(
2545        &self,
2546        conn_id: ConnectionId,
2547        _request_id: RequestId,
2548        method_id: vox_types::MethodId,
2549        root_type: &vox_types::TypeRef,
2550        source: &dyn vox_types::SchemaSource,
2551        response: &mut RequestResponse<'_>,
2552    ) {
2553        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2554        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2555        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2556        let key = (vox_types::BindingDirection::Response, method_id);
2557        if conn_state
2558            .send_tracker
2559            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2560        {
2561            response.schemas = Default::default();
2562            return;
2563        }
2564        let prepared =
2565            Self::get_or_plan_binding_from_source(&mut conn_state, key, root_type, source);
2566        response.schemas = prepared.to_cbor();
2567    }
2568
2569    /// Prepare response schemas for a replay using the running code's
2570    /// static `&'static Shape` for the response type. Same connection-
2571    /// scoped dedup as fresh responses.
2572    pub(crate) fn prepare_response_from_shape(
2573        &self,
2574        conn_id: ConnectionId,
2575        request_id: RequestId,
2576        method_id: vox_types::MethodId,
2577        response_shape: &'static Shape,
2578        response: &mut RequestResponse<'_>,
2579    ) {
2580        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2581        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2582        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2583        let key = (vox_types::BindingDirection::Response, method_id);
2584        if conn_state
2585            .send_tracker
2586            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2587        {
2588            response.schemas = Default::default();
2589            return;
2590        }
2591        let prepared = match Self::get_or_plan_binding_for_shape(
2592            &mut conn_state,
2593            key,
2594            request_id,
2595            "response",
2596            response_shape,
2597        ) {
2598            Some(prepared) => prepared,
2599            None => return,
2600        };
2601        response.schemas = prepared.to_cbor();
2602    }
2603
2604    fn get_or_plan_binding_for_shape(
2605        conn_state: &mut SendConnState,
2606        key: (vox_types::BindingDirection, vox_types::MethodId),
2607        request_id: RequestId,
2608        kind: &str,
2609        shape: &'static Shape,
2610    ) -> Option<vox_types::PreparedSchemaPlan> {
2611        if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2612            return Some(prepared.clone());
2613        }
2614        match vox_types::SchemaSendTracker::plan_for_shape(shape) {
2615            Ok(prepared) => {
2616                vox_types::dlog!(
2617                    "[schema] planned {} {} schemas for method {:?} (req {:?})",
2618                    prepared.schemas.len(),
2619                    kind,
2620                    key.1,
2621                    request_id
2622                );
2623                conn_state.planned_bindings.insert(key, prepared.clone());
2624                Some(prepared)
2625            }
2626            Err(e) => {
2627                tracing::error!("schema extraction failed: {e}");
2628                None
2629            }
2630        }
2631    }
2632
2633    fn get_or_plan_binding_from_source(
2634        conn_state: &mut SendConnState,
2635        key: (vox_types::BindingDirection, vox_types::MethodId),
2636        root_type: &vox_types::TypeRef,
2637        source: &dyn vox_types::SchemaSource,
2638    ) -> vox_types::PreparedSchemaPlan {
2639        if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2640            return prepared.clone();
2641        }
2642        let prepared = vox_types::SchemaSendTracker::plan_from_source(root_type, source);
2643        conn_state.planned_bindings.insert(key, prepared.clone());
2644        prepared
2645    }
2646
2647    fn plan_response_schema_send(
2648        conn_state: &mut SendConnState,
2649        request_id: RequestId,
2650        method_id: vox_types::MethodId,
2651        response: &mut RequestResponse<'_>,
2652        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2653    ) -> Option<PendingSchemaSend> {
2654        if conn_state
2655            .send_tracker
2656            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2657        {
2658            response.schemas = Default::default();
2659            return None;
2660        }
2661
2662        let key = (vox_types::BindingDirection::Response, method_id);
2663        let prepared = if !response.schemas.is_empty() {
2664            conn_state
2665                .planned_bindings
2666                .get(&key)
2667                .cloned()
2668                .unwrap_or_else(|| {
2669                    let prepared_payload = vox_types::SchemaPayload::from_cbor(&response.schemas.0)
2670                        .expect("prepared schema payloads must be valid CBOR");
2671                    vox_types::PreparedSchemaPlan {
2672                        schemas: prepared_payload.schemas,
2673                        root: prepared_payload.root,
2674                    }
2675                })
2676        } else {
2677            match &response.ret {
2678                vox_types::Payload::Value { shape, .. } => Self::get_or_plan_binding_for_shape(
2679                    conn_state, key, request_id, "response", shape,
2680                )?,
2681                vox_types::Payload::PostcardBytes(_) => {
2682                    let Some(source) = forwarded_schemas else {
2683                        tracing::error!(
2684                            "schema attachment failed: missing forwarded response schemas for method {:?}",
2685                            method_id
2686                        );
2687                        return None;
2688                    };
2689                    let Some(root) = source.get_remote_response_root(method_id) else {
2690                        tracing::error!(
2691                            "schema attachment failed: missing forwarded response root for method {:?}",
2692                            method_id
2693                        );
2694                        return None;
2695                    };
2696                    Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2697                }
2698            }
2699        };
2700
2701        Some(PendingSchemaSend {
2702            method_id,
2703            direction: vox_types::BindingDirection::Response,
2704            prepared,
2705        })
2706    }
2707
2708    fn plan_call_schema_send(
2709        conn_state: &mut SendConnState,
2710        request_id: RequestId,
2711        method_id: vox_types::MethodId,
2712        call: &mut vox_types::RequestCall<'_>,
2713        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2714    ) -> Option<PendingSchemaSend> {
2715        conn_state.inflight_outgoing.insert(request_id, method_id);
2716        if conn_state
2717            .send_tracker
2718            .has_sent_binding(method_id, vox_types::BindingDirection::Args)
2719        {
2720            call.schemas = Default::default();
2721            return None;
2722        }
2723
2724        let key = (vox_types::BindingDirection::Args, method_id);
2725        let prepared = match &call.args {
2726            vox_types::Payload::Value { shape, .. } => {
2727                Self::get_or_plan_binding_for_shape(conn_state, key, request_id, "args", shape)?
2728            }
2729            vox_types::Payload::PostcardBytes(_) => {
2730                let Some(source) = forwarded_schemas else {
2731                    tracing::error!(
2732                        "schema attachment failed: missing forwarded args schemas for method {:?}",
2733                        method_id
2734                    );
2735                    return None;
2736                };
2737                let Some(root) = source.get_remote_args_root(method_id) else {
2738                    tracing::error!(
2739                        "schema attachment failed: missing forwarded args root for method {:?}",
2740                        method_id
2741                    );
2742                    return None;
2743                };
2744                Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2745            }
2746        };
2747
2748        Some(PendingSchemaSend {
2749            method_id,
2750            direction: vox_types::BindingDirection::Args,
2751            prepared,
2752        })
2753    }
2754
2755    fn replace_tx_and_reset_schemas(&self, tx: Arc<dyn DynConduitTx>) {
2756        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2757        inner.tx = tx;
2758        inner.conns.clear();
2759    }
2760}
2761
2762pub(crate) struct RecoveredConduit {
2763    pub tx: Arc<dyn DynConduitTx>,
2764    pub rx: Box<dyn DynConduitRx>,
2765    pub handshake: HandshakeResult,
2766}
2767
2768pub(crate) trait ConduitRecoverer: MaybeSend {
2769    fn next_conduit<'a>(
2770        &'a mut self,
2771        resume_key: Option<&'a SessionResumeKey>,
2772    ) -> BoxFut<'a, Result<RecoveredConduit, SessionError>>;
2773}
2774
2775pub trait DynConduitTx: MaybeSend + MaybeSync {
2776    fn prepare_msg<'a>(
2777        self: Arc<Self>,
2778        msg: Message<'a>,
2779        binder: Option<&'a dyn vox_types::ChannelBinder>,
2780    ) -> std::io::Result<OutboundSend>;
2781}
2782pub trait DynConduitRx: MaybeSend {
2783    fn recv_msg<'a>(&'a mut self)
2784    -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>>;
2785
2786    /// Descriptors that arrived with the frame from the most recent
2787    /// `recv_msg`. Threaded alongside the message to the typed-decode site.
2788    fn take_frame_fds(&mut self) -> vox_types::FrameFds;
2789}
2790
2791// r[impl zerocopy.send]
2792// r[impl zerocopy.framing.pipeline.outgoing]
2793impl<T> DynConduitTx for T
2794where
2795    T: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
2796{
2797    fn prepare_msg<'a>(
2798        self: Arc<Self>,
2799        msg: Message<'a>,
2800        binder: Option<&'a dyn vox_types::ChannelBinder>,
2801    ) -> std::io::Result<OutboundSend> {
2802        let prepared = if let Some(binder) = binder {
2803            vox_types::with_channel_binder(binder, || self.prepare_send(msg))
2804        } else {
2805            self.prepare_send(msg)
2806        };
2807        let prepared = prepared.map_err(|e| std::io::Error::other(e.to_string()))?;
2808        Ok(Box::pin(async move {
2809            self.send_prepared(prepared)
2810                .await
2811                .map_err(|e| std::io::Error::other(e.to_string()))
2812        }))
2813    }
2814}
2815
2816impl<T> DynConduitRx for T
2817where
2818    T: ConduitRx<Msg = MessageFamily> + MaybeSend,
2819{
2820    fn recv_msg<'a>(
2821        &'a mut self,
2822    ) -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>> {
2823        Box::pin(async move {
2824            self.recv()
2825                .await
2826                .map_err(|error| std::io::Error::other(error.to_string()))
2827        })
2828    }
2829
2830    fn take_frame_fds(&mut self) -> vox_types::FrameFds {
2831        ConduitRx::take_frame_fds(self)
2832    }
2833}
2834
2835#[cfg(test)]
2836mod tests {
2837    use moire::sync::mpsc;
2838    use vox_types::{
2839        Backing, Conduit, ConnectionAccept, ConnectionReject, HandshakeResult, SelfRef,
2840    };
2841
2842    use super::*;
2843
2844    fn make_session() -> Session {
2845        let (a, b) = crate::memory_link_pair(32);
2846        // Keep the peer link alive so sess_core sends don't fail with broken pipe.
2847        std::mem::forget(b);
2848        let conduit = crate::BareConduit::new(a);
2849        let (tx, rx) = conduit.split();
2850        let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open.test", 4);
2851        let (_close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close.test", 4);
2852        let (_resume_tx, resume_rx) = mpsc::channel::<ResumeRequest>("session.resume.test", 1);
2853        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control.test");
2854        Session::pre_handshake(
2855            tx, rx, None, open_rx, close_rx, resume_rx, control_tx, control_rx, None, false, None,
2856            None, None,
2857        )
2858    }
2859
2860    fn resumed_handshake(
2861        our_settings: ConnectionSettings,
2862        peer_settings: ConnectionSettings,
2863    ) -> HandshakeResult {
2864        HandshakeResult {
2865            role: SessionRole::Initiator,
2866            our_settings,
2867            peer_settings,
2868            peer_supports_retry: true,
2869            session_resume_key: Some(SessionResumeKey([7; 16])),
2870            peer_resume_key: None,
2871            our_schema: vec![],
2872            peer_schema: vec![],
2873            peer_metadata: vec![],
2874        }
2875    }
2876
2877    fn accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2878        SelfRef::owning(
2879            Backing::Boxed(Box::<[u8]>::default()),
2880            ConnectionAccept {
2881                connection_settings: ConnectionSettings {
2882                    parity: Parity::Even,
2883                    max_concurrent_requests: 64,
2884                    initial_channel_credit: 16,
2885                },
2886                metadata: vec![],
2887            },
2888        )
2889    }
2890
2891    fn zero_credit_accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2892        SelfRef::owning(
2893            Backing::Boxed(Box::<[u8]>::default()),
2894            ConnectionAccept {
2895                connection_settings: ConnectionSettings {
2896                    parity: Parity::Even,
2897                    max_concurrent_requests: 64,
2898                    initial_channel_credit: 0,
2899                },
2900                metadata: vec![],
2901            },
2902        )
2903    }
2904
2905    fn reject_ref() -> SelfRef<ConnectionReject<'static>> {
2906        SelfRef::owning(
2907            Backing::Boxed(Box::<[u8]>::default()),
2908            ConnectionReject { metadata: vec![] },
2909        )
2910    }
2911
2912    #[tokio::test]
2913    async fn duplicate_connection_accept_is_ignored_after_first() {
2914        let mut session = make_session();
2915        let conn_id = ConnectionId(1);
2916        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2917
2918        session.conns.insert(
2919            conn_id,
2920            ConnectionSlot::PendingOutbound(PendingOutboundData {
2921                local_settings: ConnectionSettings {
2922                    parity: Parity::Odd,
2923                    max_concurrent_requests: 64,
2924                    initial_channel_credit: 16,
2925                },
2926                result_tx: Some(result_tx),
2927            }),
2928        );
2929
2930        session.handle_inbound_accept(conn_id, accept_ref());
2931        let handle = result_rx
2932            .await
2933            .expect("pending outbound result should resolve")
2934            .expect("accept should resolve as Ok");
2935        assert_eq!(handle.connection_id(), conn_id);
2936
2937        session.handle_inbound_accept(conn_id, accept_ref());
2938        assert!(
2939            matches!(
2940                session.conns.get(&conn_id),
2941                Some(ConnectionSlot::Active(ConnectionState { id, .. })) if *id == conn_id
2942            ),
2943            "duplicate accept should keep existing active connection state"
2944        );
2945    }
2946
2947    #[tokio::test]
2948    async fn duplicate_connection_reject_is_ignored_after_first() {
2949        let mut session = make_session();
2950        let conn_id = ConnectionId(1);
2951        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2952
2953        session.conns.insert(
2954            conn_id,
2955            ConnectionSlot::PendingOutbound(PendingOutboundData {
2956                local_settings: ConnectionSettings {
2957                    parity: Parity::Odd,
2958                    max_concurrent_requests: 64,
2959                    initial_channel_credit: 16,
2960                },
2961                result_tx: Some(result_tx),
2962            }),
2963        );
2964
2965        session.handle_inbound_reject(conn_id, reject_ref());
2966        let result = result_rx
2967            .await
2968            .expect("pending outbound result should resolve");
2969        assert!(
2970            matches!(result, Err(SessionError::Rejected(_))),
2971            "expected rejection, got: {result:?}"
2972        );
2973
2974        session.handle_inbound_reject(conn_id, reject_ref());
2975        assert!(
2976            !session.conns.contains_key(&conn_id),
2977            "duplicate reject should not recreate connection state"
2978        );
2979    }
2980
2981    // r[verify rpc.flow-control.credit.initial.zero]
2982    #[tokio::test]
2983    async fn inbound_accept_with_zero_initial_credit_rejects_pending_open() {
2984        let mut session = make_session();
2985        let conn_id = ConnectionId(1);
2986        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2987
2988        session.conns.insert(
2989            conn_id,
2990            ConnectionSlot::PendingOutbound(PendingOutboundData {
2991                local_settings: ConnectionSettings {
2992                    parity: Parity::Odd,
2993                    max_concurrent_requests: 64,
2994                    initial_channel_credit: 16,
2995                },
2996                result_tx: Some(result_tx),
2997            }),
2998        );
2999
3000        session.handle_inbound_accept(conn_id, zero_credit_accept_ref());
3001        let result = result_rx
3002            .await
3003            .expect("pending outbound result should resolve");
3004        assert!(
3005            matches!(
3006                result,
3007                Err(SessionError::Protocol(ref message))
3008                    if message == "initial_channel_credit must be greater than zero"
3009            ),
3010            "expected zero-credit protocol error, got: {result:?}"
3011        );
3012        assert!(
3013            !session.conns.contains_key(&conn_id),
3014            "zero-credit accept should not create an active connection"
3015        );
3016    }
3017
3018    #[test]
3019    fn out_of_order_accept_or_reject_without_pending_is_ignored() {
3020        let mut session = make_session();
3021        let conn_id = ConnectionId(99);
3022
3023        session.handle_inbound_accept(conn_id, accept_ref());
3024        session.handle_inbound_reject(conn_id, reject_ref());
3025
3026        assert!(
3027            session.conns.is_empty(),
3028            "out-of-order accept/reject should not mutate empty connection table"
3029        );
3030    }
3031
3032    #[tokio::test]
3033    async fn close_request_clears_pending_outbound_open() {
3034        let mut session = make_session();
3035        let (open_result_tx, open_result_rx) = moire::sync::oneshot::channel("session.open.result");
3036        let (close_result_tx, close_result_rx) =
3037            moire::sync::oneshot::channel("session.close.result");
3038
3039        session.conns.insert(
3040            ConnectionId(1),
3041            ConnectionSlot::PendingOutbound(PendingOutboundData {
3042                local_settings: ConnectionSettings {
3043                    parity: Parity::Odd,
3044                    max_concurrent_requests: 64,
3045                    initial_channel_credit: 16,
3046                },
3047                result_tx: Some(open_result_tx),
3048            }),
3049        );
3050
3051        session
3052            .handle_close_request(CloseRequest {
3053                conn_id: ConnectionId(1),
3054                metadata: vec![],
3055                result_tx: close_result_tx,
3056            })
3057            .await;
3058
3059        let close_result = close_result_rx
3060            .await
3061            .expect("close result should be delivered");
3062        assert!(
3063            close_result.is_ok(),
3064            "close should succeed for pending outbound connection"
3065        );
3066
3067        assert!(
3068            open_result_rx.await.is_err(),
3069            "pending open result channel should be closed once the pending slot is removed"
3070        );
3071    }
3072
3073    #[test]
3074    fn resume_rejects_changed_local_root_settings() {
3075        let mut session = make_session();
3076        let local_settings = ConnectionSettings {
3077            parity: Parity::Odd,
3078            max_concurrent_requests: 64,
3079            initial_channel_credit: 16,
3080        };
3081        let peer_settings = ConnectionSettings {
3082            parity: Parity::Even,
3083            max_concurrent_requests: 64,
3084            initial_channel_credit: 16,
3085        };
3086        let _root = session
3087            .establish_from_handshake(resumed_handshake(
3088                local_settings.clone(),
3089                peer_settings.clone(),
3090            ))
3091            .expect("initial handshake should establish session");
3092
3093        let (link_a, _link_b) = crate::memory_link_pair(32);
3094        let conduit = crate::BareConduit::new(link_a);
3095        let (tx, rx) = conduit.split();
3096
3097        let result = session.resume_from_handshake(
3098            Arc::new(tx),
3099            Box::new(rx),
3100            resumed_handshake(
3101                ConnectionSettings {
3102                    parity: Parity::Odd,
3103                    max_concurrent_requests: 65,
3104                    initial_channel_credit: 16,
3105                },
3106                peer_settings,
3107            ),
3108        );
3109
3110        assert!(
3111            matches!(
3112                &result,
3113                Err(SessionError::Protocol(message))
3114                    if message == "local root settings changed across session resume"
3115            ),
3116            "expected local-root-settings mismatch, got: {result:?}"
3117        );
3118    }
3119
3120    #[test]
3121    fn resume_rejects_changed_peer_root_settings() {
3122        let mut session = make_session();
3123        let local_settings = ConnectionSettings {
3124            parity: Parity::Odd,
3125            max_concurrent_requests: 64,
3126            initial_channel_credit: 16,
3127        };
3128        let peer_settings = ConnectionSettings {
3129            parity: Parity::Even,
3130            max_concurrent_requests: 64,
3131            initial_channel_credit: 16,
3132        };
3133        let _root = session
3134            .establish_from_handshake(resumed_handshake(
3135                local_settings.clone(),
3136                peer_settings.clone(),
3137            ))
3138            .expect("initial handshake should establish session");
3139
3140        let (link_a, _link_b) = crate::memory_link_pair(32);
3141        let conduit = crate::BareConduit::new(link_a);
3142        let (tx, rx) = conduit.split();
3143
3144        let result = session.resume_from_handshake(
3145            Arc::new(tx),
3146            Box::new(rx),
3147            resumed_handshake(
3148                local_settings,
3149                ConnectionSettings {
3150                    parity: Parity::Even,
3151                    max_concurrent_requests: 65,
3152                    initial_channel_credit: 16,
3153                },
3154            ),
3155        );
3156
3157        assert!(
3158            matches!(
3159                &result,
3160                Err(SessionError::Protocol(message))
3161                    if message == "peer root settings changed across session resume"
3162            ),
3163            "expected peer-root-settings mismatch, got: {result:?}"
3164        );
3165    }
3166}