Skip to main content

vox_core/session/
mod.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    future::Future,
4    pin::Pin,
5    sync::Arc,
6    time::Duration,
7};
8
9use facet_core::Shape;
10use moire::sync::mpsc;
11use tokio::sync::{mpsc as tokio_mpsc, oneshot as tokio_oneshot, watch};
12use tracing::{trace, warn};
13use vox_types::{
14    BoxFut, ChannelMessage, ConduitRx, ConduitTx, ConnectionAccept, ConnectionClose, ConnectionId,
15    ConnectionOpen, ConnectionReject, ConnectionSettings, Handler, HandshakeResult, IdAllocator,
16    MaybeSend, MaybeSync, Message, MessageFamily, MessagePayload, Metadata, Parity, RequestBody,
17    RequestId, RequestMessage, RequestResponse, SchemaMessage, SelfRef, SessionResumeKey,
18    SessionRole, TrySendError, VoxDebugSnapshot, VoxObserverHandle,
19};
20use vox_types::{
21    ConnectionCloseReason, ConnectionDebugSnapshot, ConnectionDebugState, DecodeErrorKind,
22    DriverTaskStatus,
23};
24
25mod builders;
26pub use builders::*;
27
28/// Session-level protocol keepalive configuration.
29#[derive(Debug, Clone, Copy)]
30pub struct SessionKeepaliveConfig {
31    pub ping_interval: Duration,
32    pub pong_timeout: Duration,
33}
34
35// ---------------------------------------------------------------------------
36// Connection acceptor trait
37// ---------------------------------------------------------------------------
38
39/// Metadata wrapper with typed getters for well-known `vox-*` keys.
40///
41/// Passed to [`ConnectionAcceptor::accept`] when a peer opens a connection.
42pub struct ConnectionRequest<'a> {
43    metadata: &'a [vox_types::MetadataEntry<'a>],
44    service: &'a str,
45}
46
47impl<'a> ConnectionRequest<'a> {
48    /// Build a connection request from metadata.
49    ///
50    /// Returns an error if the required `vox-service` metadata key is missing.
51    pub fn new(metadata: &'a [vox_types::MetadataEntry<'a>]) -> Result<Self, SessionError> {
52        let service = vox_types::metadata_get_str(metadata, "vox-service").ok_or_else(|| {
53            SessionError::Protocol("missing required vox-service metadata".into())
54        })?;
55        Ok(Self { metadata, service })
56    }
57
58    /// The requested service name (`vox-service` metadata key).
59    pub fn service(&self) -> &str {
60        self.service
61    }
62
63    /// The transport type (`vox-transport` metadata key).
64    pub fn transport(&self) -> Option<&str> {
65        vox_types::metadata_get_str(self.metadata, "vox-transport")
66    }
67
68    /// The peer address (`vox-peer-addr` metadata key).
69    pub fn peer_addr(&self) -> Option<&str> {
70        vox_types::metadata_get_str(self.metadata, "vox-peer-addr")
71    }
72
73    /// Whether this is a root or virtual connection.
74    pub fn is_root(&self) -> bool {
75        !self.is_virtual()
76    }
77
78    /// Whether this is a virtual connection.
79    pub fn is_virtual(&self) -> bool {
80        vox_types::metadata_get_str(self.metadata, "vox-connection-kind") == Some("virtual")
81    }
82
83    /// Look up a string value by key.
84    pub fn get_str(&self, key: &str) -> Option<&str> {
85        vox_types::metadata_get_str(self.metadata, key)
86    }
87
88    /// Look up a u64 value by key.
89    pub fn get_u64(&self, key: &str) -> Option<u64> {
90        vox_types::metadata_get_u64(self.metadata, key)
91    }
92
93    /// Access the raw metadata entries.
94    pub fn metadata(&self) -> &[vox_types::MetadataEntry<'a>] {
95        self.metadata
96    }
97}
98
99/// A connection that has been opened but not yet accepted.
100///
101/// The acceptor receives this and decides its fate by calling one of:
102/// - `handle_with(handler)` — run a Driver with this handler (common case)
103/// - `proxy_to(other_handle)` — pipe messages to/from another connection
104/// - `into_handle()` — take the raw ConnectionHandle for custom use
105pub struct PendingConnection {
106    handle: Option<ConnectionHandle>,
107    caller_slot: Option<Arc<std::sync::Mutex<Option<crate::Caller>>>>,
108    operation_store: Option<Arc<dyn crate::OperationStore>>,
109}
110
111impl PendingConnection {
112    fn new(handle: ConnectionHandle) -> Self {
113        Self {
114            handle: Some(handle),
115            caller_slot: None,
116            operation_store: None,
117        }
118    }
119
120    /// Create a PendingConnection that captures the Caller when handle_with is called.
121    fn with_caller_slot(
122        handle: ConnectionHandle,
123        caller_slot: Arc<std::sync::Mutex<Option<crate::Caller>>>,
124        operation_store: Option<Arc<dyn crate::OperationStore>>,
125    ) -> Self {
126        Self {
127            handle: Some(handle),
128            caller_slot: Some(caller_slot),
129            operation_store,
130        }
131    }
132
133    /// Accept this connection and run a Driver with the given handler.
134    pub fn handle_with(mut self, handler: impl Handler<crate::DriverReplySink> + 'static) {
135        let handle = self
136            .handle
137            .take()
138            .expect("PendingConnection already consumed");
139        let conn_id = handle.connection_id();
140        trace!(%conn_id, "PendingConnection::handle_with: creating driver");
141        let mut driver = match self.operation_store.take() {
142            Some(store) => crate::Driver::with_operation_store(handle, handler, store),
143            None => crate::Driver::new(handle, handler),
144        };
145        if let Some(slot) = &self.caller_slot {
146            let caller = crate::Caller::new(driver.caller());
147            *slot.lock().unwrap() = Some(caller);
148        }
149        #[cfg(not(target_arch = "wasm32"))]
150        tokio::spawn(async move {
151            trace!(%conn_id, "PendingConnection driver starting");
152            driver.run().await;
153            trace!(%conn_id, "PendingConnection driver exited");
154        });
155        #[cfg(target_arch = "wasm32")]
156        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
157    }
158
159    /// Accept this connection, run a Driver, and return a typed client for the peer.
160    pub fn handle_with_client<C: crate::FromVoxSession>(
161        mut self,
162        handler: impl Handler<crate::DriverReplySink> + 'static,
163    ) -> C {
164        let handle = self
165            .handle
166            .take()
167            .expect("PendingConnection already consumed");
168        let conn_id = handle.connection_id();
169        trace!(%conn_id, "PendingConnection::handle_with_client: creating driver");
170        let mut driver = match self.operation_store.take() {
171            Some(store) => crate::Driver::with_operation_store(handle, handler, store),
172            None => crate::Driver::new(handle, handler),
173        };
174        let caller = crate::Caller::new(driver.caller());
175        if let Some(slot) = &self.caller_slot {
176            *slot.lock().unwrap() = Some(caller.clone());
177        }
178        #[cfg(not(target_arch = "wasm32"))]
179        tokio::spawn(async move {
180            trace!(%conn_id, "PendingConnection driver starting");
181            driver.run().await;
182            trace!(%conn_id, "PendingConnection driver exited");
183        });
184        #[cfg(target_arch = "wasm32")]
185        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
186        C::from_vox_session(caller, None)
187    }
188
189    /// Accept this connection and proxy all traffic to/from another connection.
190    pub fn proxy_to(mut self, other: ConnectionHandle) {
191        let handle = self
192            .handle
193            .take()
194            .expect("PendingConnection already consumed");
195        #[cfg(not(target_arch = "wasm32"))]
196        tokio::spawn(async move {
197            let _ = proxy_connections(handle, other).await;
198        });
199        #[cfg(target_arch = "wasm32")]
200        wasm_bindgen_futures::spawn_local(async move {
201            let _ = proxy_connections(handle, other).await;
202        });
203    }
204
205    /// Take the raw ConnectionHandle for custom use.
206    pub fn into_handle(mut self) -> ConnectionHandle {
207        self.handle
208            .take()
209            .expect("PendingConnection already consumed")
210    }
211}
212
213impl Drop for PendingConnection {
214    fn drop(&mut self) {
215        if let Some(handle) = self.handle.take() {
216            let conn_id = handle.connection_id();
217            warn!(%conn_id, "PendingConnection dropped without being consumed — closing connection");
218            if let Some(tx) = handle.control_tx.as_ref() {
219                let _ = send_drop_control(tx, DropControlRequest::Close(conn_id));
220            }
221        }
222    }
223}
224
225// r[impl rpc.virtual-connection.accept]
226pub trait ConnectionAcceptor: MaybeSend + MaybeSync + 'static {
227    fn accept(
228        &self,
229        request: &ConnectionRequest,
230        connection: PendingConnection,
231    ) -> Result<(), Metadata<'static>>;
232}
233
234/// Any `Handler<DriverReplySink>` is automatically a `ConnectionAcceptor`.
235impl<H> ConnectionAcceptor for H
236where
237    H: Handler<crate::DriverReplySink> + Clone + MaybeSend + MaybeSync + 'static,
238{
239    fn accept(
240        &self,
241        _request: &ConnectionRequest,
242        connection: PendingConnection,
243    ) -> Result<(), Metadata<'static>> {
244        connection.handle_with(self.clone());
245        Ok(())
246    }
247}
248
249/// Wrapper that turns a closure into a `ConnectionAcceptor`.
250pub struct AcceptorFn<F>(pub F);
251
252impl<F> ConnectionAcceptor for AcceptorFn<F>
253where
254    F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
255        + MaybeSend
256        + MaybeSync
257        + 'static,
258{
259    fn accept(
260        &self,
261        request: &ConnectionRequest,
262        connection: PendingConnection,
263    ) -> Result<(), Metadata<'static>> {
264        (self.0)(request, connection)
265    }
266}
267
268/// Create a `ConnectionAcceptor` from a closure.
269pub fn acceptor_fn<F>(f: F) -> AcceptorFn<F>
270where
271    F: Fn(&ConnectionRequest, PendingConnection) -> Result<(), Metadata<'static>>
272        + MaybeSend
273        + MaybeSync
274        + 'static,
275{
276    AcceptorFn(f)
277}
278
279// ---------------------------------------------------------------------------
280// Open/close request types (from SessionHandle → run loop)
281// ---------------------------------------------------------------------------
282
283struct OpenRequest {
284    settings: ConnectionSettings,
285    metadata: Metadata<'static>,
286    result_tx: moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>,
287}
288
289struct CloseRequest {
290    conn_id: ConnectionId,
291    metadata: Metadata<'static>,
292    result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
293}
294
295struct ResumeRequest {
296    tx: Arc<dyn DynConduitTx>,
297    rx: Box<dyn DynConduitRx>,
298    handshake_result: HandshakeResult,
299    result_tx: moire::sync::oneshot::Sender<Result<(), SessionError>>,
300}
301
302#[derive(Debug, Clone, Copy)]
303pub(crate) enum DropControlRequest {
304    Shutdown,
305    Close(ConnectionId),
306}
307
308#[derive(Clone, Copy, Debug)]
309pub(crate) enum FailureDisposition {
310    Cancelled,
311    Indeterminate,
312}
313
314#[cfg(not(target_arch = "wasm32"))]
315fn send_drop_control(
316    tx: &mpsc::UnboundedSender<DropControlRequest>,
317    req: DropControlRequest,
318) -> Result<(), ()> {
319    tx.send(req).map_err(|_| ())
320}
321
322#[cfg(target_arch = "wasm32")]
323fn send_drop_control(
324    tx: &mpsc::UnboundedSender<DropControlRequest>,
325    req: DropControlRequest,
326) -> Result<(), ()> {
327    tx.try_send(req).map_err(|_| ())
328}
329
330// ---------------------------------------------------------------------------
331// SessionHandle — cloneable handle for opening/closing virtual connections
332// ---------------------------------------------------------------------------
333
334/// Cloneable handle for opening and closing virtual connections.
335///
336/// Returned by the session builder alongside the `Session` and root
337/// `ConnectionHandle`. The session's `run()` loop must be running
338/// concurrently for requests to be processed.
339// r[impl rpc.virtual-connection.open]
340#[derive(Clone)]
341pub struct SessionHandle {
342    open_tx: mpsc::Sender<OpenRequest>,
343    close_tx: mpsc::Sender<CloseRequest>,
344    resume_tx: mpsc::Sender<ResumeRequest>,
345    control_tx: mpsc::UnboundedSender<DropControlRequest>,
346    resume_key: Option<SessionResumeKey>,
347}
348
349impl SessionHandle {
350    /// Open a typed virtual connection on the session.
351    ///
352    /// Sends `vox-service` metadata automatically from the client's
353    /// `SERVICE_NAME`. Creates a `Driver` and spawns it, returning
354    /// a ready-to-use typed client.
355    pub async fn open<Client: crate::FromVoxSession>(
356        &self,
357        settings: ConnectionSettings,
358    ) -> Result<Client, SessionError> {
359        use crate::{Caller, Driver};
360        use vox_types::{MetadataEntry, MetadataFlags, MetadataValue};
361
362        let metadata: Metadata<'static> = vec![MetadataEntry {
363            key: crate::session::builders::VOX_SERVICE_METADATA_KEY.into(),
364            value: MetadataValue::String(Client::SERVICE_NAME.into()),
365            flags: MetadataFlags::NONE,
366        }];
367        let handle = self.open_connection(settings, metadata).await?;
368        let mut driver = Driver::new(handle, ());
369        let caller = Caller::new(driver.caller());
370        #[cfg(not(target_arch = "wasm32"))]
371        tokio::spawn(async move { driver.run().await });
372        #[cfg(target_arch = "wasm32")]
373        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
374        Ok(Client::from_vox_session(caller, None))
375    }
376
377    /// Open a new virtual connection on the session.
378    ///
379    /// Allocates a connection ID, sends `ConnectionOpen` to the peer, and
380    /// waits for `ConnectionAccept` or `ConnectionReject`. The session's
381    /// `run()` loop processes the response and completes the returned future.
382    // r[impl connection.open]
383    pub async fn open_connection(
384        &self,
385        settings: ConnectionSettings,
386        metadata: Metadata<'static>,
387    ) -> Result<ConnectionHandle, SessionError> {
388        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.open_result");
389        self.open_tx
390            .send(OpenRequest {
391                settings,
392                metadata,
393                result_tx,
394            })
395            .await
396            .map_err(|_| SessionError::Protocol("session closed".into()))?;
397        result_rx
398            .await
399            .map_err(|_| SessionError::Protocol("session closed".into()))?
400    }
401
402    /// Close a virtual connection.
403    ///
404    /// Sends `ConnectionClose` to the peer and removes the connection slot.
405    /// After this returns, no further messages will be routed to the
406    /// connection's driver.
407    // r[impl connection.close]
408    pub async fn close_connection(
409        &self,
410        conn_id: ConnectionId,
411        metadata: Metadata<'static>,
412    ) -> Result<(), SessionError> {
413        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.close_result");
414        self.close_tx
415            .send(CloseRequest {
416                conn_id,
417                metadata,
418                result_tx,
419            })
420            .await
421            .map_err(|_| SessionError::Protocol("session closed".into()))?;
422        result_rx
423            .await
424            .map_err(|_| SessionError::Protocol("session closed".into()))?
425    }
426
427    pub(crate) async fn resume_parts(
428        &self,
429        tx: Arc<dyn DynConduitTx>,
430        rx: Box<dyn DynConduitRx>,
431        handshake_result: HandshakeResult,
432    ) -> Result<(), SessionError> {
433        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.resume_result");
434        self.resume_tx
435            .send(ResumeRequest {
436                tx,
437                rx,
438                handshake_result,
439                result_tx,
440            })
441            .await
442            .map_err(|_| SessionError::Protocol("session closed".into()))?;
443        result_rx
444            .await
445            .map_err(|_| SessionError::Protocol("session closed".into()))?
446    }
447
448    /// Returns the session resume key, if the session is resumable.
449    pub fn resume_key(&self) -> Option<&SessionResumeKey> {
450        self.resume_key.as_ref()
451    }
452
453    /// Request shutdown of the entire session (root + all virtual connections).
454    pub fn shutdown(&self) -> Result<(), SessionError> {
455        send_drop_control(&self.control_tx, DropControlRequest::Shutdown)
456            .map_err(|_| SessionError::Protocol("session closed".into()))
457    }
458}
459
460// ---------------------------------------------------------------------------
461// Session
462// ---------------------------------------------------------------------------
463
464/// Session state machine.
465// r[impl session]
466// r[impl rpc.one-service-per-connection]
467pub struct Session {
468    /// Conduit receiver
469    rx: Box<dyn DynConduitRx>,
470
471    // r[impl session.role]
472    role: SessionRole,
473
474    /// Our local parity — determines which connection IDs we allocate.
475    // r[impl session.parity]
476    parity: Parity,
477
478    /// Shared core (for sending) — also held by all ConnectionSenders.
479    sess_core: Arc<SessionCore>,
480    peer_supports_retry: bool,
481    local_root_settings: ConnectionSettings,
482    peer_root_settings: Option<ConnectionSettings>,
483    resumable: bool,
484    session_resume_key: Option<SessionResumeKey>,
485
486    /// Connection state (active, pending inbound, pending outbound).
487    conns: BTreeMap<ConnectionId, ConnectionSlot>,
488    /// Whether the root connection was internally closed because all root callers dropped.
489    root_closed_internal: bool,
490
491    /// Allocator for outbound virtual connection IDs (uses session parity).
492    conn_ids: IdAllocator<ConnectionId>,
493
494    /// Callback for accepting inbound virtual connections.
495    on_connection: Option<Arc<dyn ConnectionAcceptor>>,
496
497    /// Receiver for open requests from SessionHandle.
498    open_rx: mpsc::Receiver<OpenRequest>,
499
500    /// Receiver for close requests from SessionHandle.
501    close_rx: mpsc::Receiver<CloseRequest>,
502
503    /// Receiver for resume requests from SessionHandle.
504    resume_rx: mpsc::Receiver<ResumeRequest>,
505
506    /// Sender/receiver for drop-driven session/connection control requests.
507    control_tx: mpsc::UnboundedSender<DropControlRequest>,
508    control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
509
510    /// Optional proactive keepalive runtime config for connection ID 0.
511    keepalive: Option<SessionKeepaliveConfig>,
512    resume_notifier: watch::Sender<u64>,
513    recoverer: Option<Box<dyn ConduitRecoverer>>,
514    recovery_timeout: Option<Duration>,
515    /// Whether this session was registered in a `SessionRegistry`, meaning
516    /// an external acceptor could route a reconnecting client to resume it.
517    registered_in_registry: bool,
518
519    observer: Option<VoxObserverHandle>,
520}
521
522#[derive(Debug)]
523struct KeepaliveRuntime {
524    ping_interval: Duration,
525    pong_timeout: Duration,
526    next_ping_at: vox_types::time::tokio::Instant,
527    waiting_pong_nonce: Option<u64>,
528    pong_deadline: vox_types::time::tokio::Instant,
529    next_ping_nonce: u64,
530}
531
532// r[impl connection]
533/// Static data for one active connection.
534#[derive(Debug)]
535pub struct ConnectionState {
536    /// Unique connection identifier
537    pub id: ConnectionId,
538
539    /// Our settings
540    pub local_settings: ConnectionSettings,
541
542    /// The peer's settings
543    pub peer_settings: ConnectionSettings,
544
545    /// Sender for routing incoming messages to the per-connection driver task.
546    conn_tx: mpsc::Sender<RecvMessage>,
547    closed_tx: watch::Sender<Option<ConnectionCloseReason>>,
548
549    /// Per-connection schema recv tracker — schemas are scoped to a connection.
550    schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
551}
552
553#[derive(Debug)]
554enum ConnectionSlot {
555    Active(ConnectionState),
556    PendingOutbound(PendingOutboundData),
557}
558
559/// Debug-printable wrapper that omits the oneshot sender.
560struct PendingOutboundData {
561    local_settings: ConnectionSettings,
562    result_tx: Option<moire::sync::oneshot::Sender<Result<ConnectionHandle, SessionError>>>,
563}
564
565impl std::fmt::Debug for PendingOutboundData {
566    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
567        f.debug_struct("PendingOutbound")
568            .field("local_settings", &self.local_settings)
569            .finish()
570    }
571}
572
573#[derive(Clone)]
574pub(crate) struct ConnectionSender {
575    connection_id: ConnectionId,
576    pub(crate) sess_core: Arc<SessionCore>,
577    failures: Arc<mpsc::UnboundedSender<(RequestId, FailureDisposition)>>,
578}
579
580fn forwarded_payload<'a>(payload: &'a vox_types::Payload<'a>) -> vox_types::Payload<'a> {
581    let vox_types::Payload::PostcardBytes(bytes) = payload else {
582        unreachable!("proxy forwarding expects decoded incoming payload bytes")
583    };
584    vox_types::Payload::PostcardBytes(bytes)
585}
586
587fn forwarded_request_body<'a>(body: &'a RequestBody<'a>) -> RequestBody<'a> {
588    match body {
589        RequestBody::Call(call) => RequestBody::Call(vox_types::RequestCall {
590            method_id: call.method_id,
591            metadata: call.metadata.clone(),
592            args: forwarded_payload(&call.args),
593            schemas: call.schemas.clone(),
594        }),
595        RequestBody::Response(response) => RequestBody::Response(RequestResponse {
596            metadata: response.metadata.clone(),
597            ret: forwarded_payload(&response.ret),
598            schemas: response.schemas.clone(),
599        }),
600        RequestBody::Cancel(cancel) => RequestBody::Cancel(vox_types::RequestCancel {
601            metadata: cancel.metadata.clone(),
602        }),
603    }
604}
605
606fn forwarded_channel_body<'a>(body: &'a vox_types::ChannelBody<'a>) -> vox_types::ChannelBody<'a> {
607    match body {
608        vox_types::ChannelBody::Item(item) => {
609            vox_types::ChannelBody::Item(vox_types::ChannelItem {
610                item: forwarded_payload(&item.item),
611            })
612        }
613        vox_types::ChannelBody::Close(close) => {
614            vox_types::ChannelBody::Close(vox_types::ChannelClose {
615                metadata: close.metadata.clone(),
616            })
617        }
618        vox_types::ChannelBody::Reset(reset) => {
619            vox_types::ChannelBody::Reset(vox_types::ChannelReset {
620                metadata: reset.metadata.clone(),
621            })
622        }
623        vox_types::ChannelBody::GrantCredit(credit) => {
624            vox_types::ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
625                additional: credit.additional,
626            })
627        }
628    }
629}
630
631impl ConnectionSender {
632    pub(crate) fn connection_id(&self) -> ConnectionId {
633        self.connection_id
634    }
635
636    pub(crate) async fn send_with_binder<'a>(
637        &self,
638        msg: ConnectionMessage<'a>,
639        binder: Option<&'a dyn vox_types::ChannelBinder>,
640    ) -> Result<(), ()> {
641        let payload = match msg {
642            ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
643            ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
644        };
645        let message = Message {
646            connection_id: self.connection_id,
647            payload,
648        };
649        self.sess_core
650            .send(message, binder, None)
651            .await
652            .map_err(|_| ())
653    }
654
655    /// Send an arbitrary connection message
656    pub async fn send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), ()> {
657        self.send_with_binder(msg, None).await
658    }
659
660    // r[impl rpc.flow-control.credit.try-send]
661    pub(crate) fn try_send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), TrySendError<()>> {
662        let payload = match msg {
663            ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
664            ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
665        };
666        self.sess_core.try_send(
667            Message {
668                connection_id: self.connection_id,
669                payload,
670            },
671            None,
672            None,
673        )
674    }
675
676    /// Send a received connection message without re-materializing payload values.
677    pub(crate) async fn send_owned(
678        &self,
679        schemas: Arc<vox_types::SchemaRecvTracker>,
680        msg: SelfRef<ConnectionMessage<'static>>,
681    ) -> Result<(), ()> {
682        let msg_ref = msg.get();
683        let payload = match msg_ref {
684            ConnectionMessage::Request(request) => MessagePayload::RequestMessage(RequestMessage {
685                id: request.id,
686                body: forwarded_request_body(&request.body),
687            }),
688            ConnectionMessage::Channel(channel) => MessagePayload::ChannelMessage(ChannelMessage {
689                id: channel.id,
690                body: forwarded_channel_body(&channel.body),
691            }),
692        };
693
694        self.sess_core
695            .send(
696                Message {
697                    connection_id: self.connection_id,
698                    payload,
699                },
700                None,
701                Some(&*schemas),
702            )
703            .await
704            .map_err(|_| ())
705    }
706
707    /// Send a response specifically
708    pub async fn send_response<'a>(
709        &self,
710        request_id: RequestId,
711        response: RequestResponse<'a>,
712    ) -> Result<(), ()> {
713        self.send(ConnectionMessage::Request(RequestMessage {
714            id: request_id,
715            body: RequestBody::Response(response),
716        }))
717        .await
718    }
719
720    /// Shape a response using an explicit method ID, then send it.
721    pub async fn send_response_for_method<'a>(
722        &self,
723        request_id: RequestId,
724        method_id: vox_types::MethodId,
725        mut response: RequestResponse<'a>,
726    ) -> Result<(), ()> {
727        self.prepare_response_for_method(request_id, method_id, &mut response);
728        self.send(ConnectionMessage::Request(RequestMessage {
729            id: request_id,
730            body: RequestBody::Response(response),
731        }))
732        .await
733    }
734
735    /// Shape a response using an explicit method ID without sending it yet.
736    pub(crate) fn prepare_response_for_method(
737        &self,
738        request_id: RequestId,
739        method_id: vox_types::MethodId,
740        response: &mut RequestResponse<'_>,
741    ) {
742        self.sess_core.prepare_response_for_method(
743            self.connection_id,
744            request_id,
745            method_id,
746            response,
747        );
748    }
749
750    /// Shape a response using an explicit canonical root type and schema source.
751    pub(crate) fn prepare_response_from_source(
752        &self,
753        request_id: RequestId,
754        method_id: vox_types::MethodId,
755        root_type: &vox_types::TypeRef,
756        source: &dyn vox_types::SchemaSource,
757        response: &mut RequestResponse<'_>,
758    ) {
759        self.sess_core.prepare_response_from_source(
760            self.connection_id,
761            request_id,
762            method_id,
763            root_type,
764            source,
765            response,
766        );
767    }
768
769    /// Mark a request as failed by removing any pending response slot.
770    /// Called when a send error occurs or no reply was sent.
771    pub fn mark_failure(&self, request_id: RequestId, disposition: FailureDisposition) {
772        let _ = self.failures.send((request_id, disposition));
773    }
774
775    /// Prepare schemas for a replay response from the running code's
776    /// static response shape — same source of truth fresh responses
777    /// use, with the same connection-scoped dedup.
778    pub fn prepare_replay_schemas(
779        &self,
780        request_id: RequestId,
781        method_id: vox_types::MethodId,
782        response_shape: &'static Shape,
783        response: &mut RequestResponse<'_>,
784    ) {
785        self.sess_core.prepare_response_from_shape(
786            self.connection_id,
787            request_id,
788            method_id,
789            response_shape,
790            response,
791        );
792    }
793}
794
795pub struct ConnectionHandle {
796    pub(crate) sender: ConnectionSender,
797    pub(crate) rx: mpsc::Receiver<RecvMessage>,
798    pub(crate) failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
799    pub(crate) control_tx: Option<mpsc::UnboundedSender<DropControlRequest>>,
800    pub(crate) closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
801    pub(crate) resumed_rx: watch::Receiver<u64>,
802    pub(crate) local_settings: ConnectionSettings,
803    pub(crate) peer_settings: ConnectionSettings,
804    /// The parity this side should use for allocating request/channel IDs.
805    pub parity: Parity,
806    pub(crate) peer_supports_retry: bool,
807    pub(crate) observer: Option<VoxObserverHandle>,
808}
809
810impl std::fmt::Debug for ConnectionHandle {
811    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812        f.debug_struct("ConnectionHandle")
813            .field("connection_id", &self.sender.connection_id)
814            .finish()
815    }
816}
817
818pub(crate) enum ConnectionMessage<'payload> {
819    Request(RequestMessage<'payload>),
820    Channel(ChannelMessage<'payload>),
821}
822
823vox_types::impl_reborrow!(ConnectionMessage);
824
825/// A message routed to a driver, carrying the `SchemaRecvTracker` that was
826/// current when the session received it. This ensures each message uses the
827/// correct tracker even across reconnections.
828pub(crate) struct RecvMessage {
829    pub schemas: Arc<vox_types::SchemaRecvTracker>,
830    pub msg: SelfRef<ConnectionMessage<'static>>,
831}
832
833impl ConnectionHandle {
834    /// Returns the connection ID for this handle.
835    pub fn connection_id(&self) -> ConnectionId {
836        self.sender.connection_id
837    }
838
839    /// Resolve when this connection closes.
840    pub async fn closed(&self) {
841        if self.closed_rx.borrow().is_some() {
842            return;
843        }
844        let mut rx = self.closed_rx.clone();
845        while rx.changed().await.is_ok() {
846            if rx.borrow().is_some() {
847                return;
848            }
849        }
850    }
851
852    /// Return whether this connection is still considered connected.
853    pub fn is_connected(&self) -> bool {
854        self.closed_rx.borrow().is_none()
855    }
856
857    pub fn close_reason(&self) -> Option<ConnectionCloseReason> {
858        *self.closed_rx.borrow()
859    }
860
861    pub fn peer_supports_retry(&self) -> bool {
862        self.peer_supports_retry
863    }
864
865    // r[impl rpc.debug.snapshot]
866    pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
867        let (outbound_queue_depth, outbound_queue_capacity) =
868            self.sender.sess_core.outbound_queue_stats();
869        VoxDebugSnapshot {
870            connections: vec![ConnectionDebugSnapshot {
871                connection_id: self.connection_id(),
872                endpoint: None,
873                surface: None,
874                component: None,
875                state: if self.closed_rx.borrow().is_some() {
876                    ConnectionDebugState::Closed
877                } else {
878                    ConnectionDebugState::Open
879                },
880                outstanding_requests: 0,
881                requests: Vec::new(),
882                open_channels: Vec::new(),
883                outbound_queue_depth: Some(outbound_queue_depth),
884                outbound_queue_capacity: Some(outbound_queue_capacity),
885                local_control_queue_depth: None,
886                local_control_queue_capacity: None,
887                last_inbound_message_at: None,
888                last_outbound_message_at: None,
889                last_progress_at: None,
890                close_reason: *self.closed_rx.borrow(),
891                driver_task_status: DriverTaskStatus::Unknown,
892            }],
893        }
894    }
895
896    pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
897        let snapshot = self.debug_snapshot();
898        tracing::info!(?snapshot, "vox debug snapshot");
899        snapshot
900    }
901}
902
903/// Forward all request/channel traffic between two connections.
904///
905/// This is a protocol-level bridge: it does not inspect service schemas or method IDs.
906/// It exits when either side closes or a forward send fails, then requests closure of
907/// both underlying connections.
908pub async fn proxy_connections(
909    left: ConnectionHandle,
910    right: ConnectionHandle,
911) -> Result<(), SessionError> {
912    if left.parity == right.parity {
913        return Err(SessionError::Protocol(
914            "proxy_connections requires opposite parities".into(),
915        ));
916    }
917    let left_conn_id = left.connection_id();
918    let right_conn_id = right.connection_id();
919    let ConnectionHandle {
920        sender: left_sender,
921        rx: mut left_rx,
922        failures_rx: _left_failures_rx,
923        control_tx: left_control_tx,
924        closed_rx: _left_closed_rx,
925        resumed_rx: _left_resumed_rx,
926        local_settings: _left_local_settings,
927        peer_settings: _left_peer_settings,
928        parity: _left_parity,
929        peer_supports_retry: _left_peer_supports_retry,
930        observer: _left_observer,
931    } = left;
932    let ConnectionHandle {
933        sender: right_sender,
934        rx: mut right_rx,
935        failures_rx: _right_failures_rx,
936        control_tx: right_control_tx,
937        closed_rx: _right_closed_rx,
938        resumed_rx: _right_resumed_rx,
939        local_settings: _right_local_settings,
940        peer_settings: _right_peer_settings,
941        parity: _right_parity,
942        peer_supports_retry: _right_peer_supports_retry,
943        observer: _right_observer,
944    } = right;
945
946    loop {
947        tokio::select! {
948            recv = left_rx.recv() => {
949                let Some(recv) = recv else {
950                    break;
951                };
952                if right_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
953                    break;
954                }
955            }
956            recv = right_rx.recv() => {
957                let Some(recv) = recv else {
958                    break;
959                };
960                if left_sender.send_owned(recv.schemas, recv.msg).await.is_err() {
961                    break;
962                }
963            }
964        }
965    }
966
967    if let Some(tx) = left_control_tx.as_ref() {
968        let _ = send_drop_control(tx, DropControlRequest::Close(left_conn_id));
969    }
970    if let Some(tx) = right_control_tx.as_ref() {
971        let _ = send_drop_control(tx, DropControlRequest::Close(right_conn_id));
972    }
973    Ok(())
974}
975
976/// Errors that can occur during session establishment or operation.
977#[derive(Debug)]
978pub enum SessionError {
979    Io(std::io::Error),
980    Protocol(String),
981    Rejected(Metadata<'static>),
982    NotResumable,
983    ConnectTimeout,
984}
985
986impl SessionError {
987    /// Returns `true` if a retry of the same connection attempt may succeed.
988    ///
989    /// I/O errors and timeouts are transient — the remote might become available
990    /// shortly. Protocol errors and explicit rejections are permanent for this
991    /// peer address and will not resolve by retrying.
992    pub fn is_retryable(&self) -> bool {
993        matches!(
994            self,
995            Self::Io(_) | Self::ConnectTimeout | Self::NotResumable
996        )
997    }
998}
999
1000impl std::fmt::Display for SessionError {
1001    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1002        match self {
1003            Self::Io(e) => write!(f, "io error: {e}"),
1004            Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
1005            Self::Rejected(_) => write!(f, "connection rejected"),
1006            Self::NotResumable => write!(f, "session is not resumable"),
1007            Self::ConnectTimeout => write!(f, "connect timeout"),
1008        }
1009    }
1010}
1011
1012impl std::error::Error for SessionError {}
1013
1014fn classify_session_recv_error(error: &std::io::Error) -> ConnectionCloseReason {
1015    let message = error.to_string();
1016    if message.contains("decode error") || message.contains("protocol") {
1017        ConnectionCloseReason::Protocol
1018    } else {
1019        ConnectionCloseReason::Transport
1020    }
1021}
1022
1023fn classify_decode_error(error: &std::io::Error) -> Option<DecodeErrorKind> {
1024    let message = error.to_string();
1025    if message.contains("decode error") {
1026        Some(DecodeErrorKind::Payload)
1027    } else {
1028        None
1029    }
1030}
1031
1032impl Session {
1033    // r[impl rpc.observability.session-errors]
1034    // r[impl rpc.observability.driver]
1035    fn observe_session_recv_error(&self, error: &std::io::Error) {
1036        let Some(observer) = &self.observer else {
1037            return;
1038        };
1039
1040        if let Some(kind) = classify_decode_error(error) {
1041            for conn_id in self.conns.iter().filter_map(|(conn_id, slot)| {
1042                matches!(slot, ConnectionSlot::Active(_)).then_some(*conn_id)
1043            }) {
1044                observer.driver_event(vox_types::DriverEvent::DecodeError {
1045                    connection_id: conn_id,
1046                    kind,
1047                });
1048            }
1049            return;
1050        }
1051
1052        observer.transport_event(vox_types::TransportEvent::Closed {
1053            connection_id: None,
1054            reason: classify_session_recv_error(error),
1055        });
1056    }
1057
1058    fn close_connection_for_protocol_error(
1059        &mut self,
1060        conn_id: ConnectionId,
1061        detail: impl std::fmt::Display,
1062    ) {
1063        warn!(%conn_id, "closing connection after protocol error: {detail}");
1064        self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Protocol);
1065        self.maybe_request_shutdown_after_root_closed();
1066    }
1067
1068    fn record_received_schema_cbor(
1069        &mut self,
1070        conn_id: ConnectionId,
1071        schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
1072        method_id: vox_types::MethodId,
1073        direction: vox_types::BindingDirection,
1074        schemas_cbor: &vox_types::CborPayload,
1075        context: &str,
1076    ) -> bool {
1077        let payload = match vox_types::SchemaPayload::from_cbor(&schemas_cbor.0) {
1078            Ok(payload) => payload,
1079            Err(error) => {
1080                self.close_connection_for_protocol_error(
1081                    conn_id,
1082                    format!("{context}: invalid schema CBOR: {error}"),
1083                );
1084                return false;
1085            }
1086        };
1087
1088        if let Err(error) = schema_recv_tracker.record_received(method_id, direction, payload) {
1089            self.close_connection_for_protocol_error(conn_id, format!("{context}: {error}"));
1090            return false;
1091        }
1092
1093        true
1094    }
1095
1096    #[allow(clippy::too_many_arguments)]
1097    fn pre_handshake<Tx, Rx>(
1098        tx: Tx,
1099        rx: Rx,
1100        on_connection: Option<Arc<dyn ConnectionAcceptor>>,
1101        open_rx: mpsc::Receiver<OpenRequest>,
1102        close_rx: mpsc::Receiver<CloseRequest>,
1103        resume_rx: mpsc::Receiver<ResumeRequest>,
1104        control_tx: mpsc::UnboundedSender<DropControlRequest>,
1105        control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
1106        keepalive: Option<SessionKeepaliveConfig>,
1107        resumable: bool,
1108        recoverer: Option<Box<dyn ConduitRecoverer>>,
1109        recovery_timeout: Option<Duration>,
1110        observer: Option<VoxObserverHandle>,
1111    ) -> Self
1112    where
1113        Tx: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
1114        Rx: ConduitRx<Msg = MessageFamily> + MaybeSend + 'static,
1115    {
1116        let (outbound_tx, outbound_rx) = tokio_mpsc::channel(256);
1117        let sess_core = Arc::new(SessionCore {
1118            inner: std::sync::Mutex::new(SessionCoreInner {
1119                tx: Arc::new(tx) as Arc<dyn DynConduitTx>,
1120                conns: HashMap::new(),
1121            }),
1122            outbound_tx,
1123            observer: observer.clone(),
1124        });
1125        spawn_outbound_worker(outbound_rx);
1126        let (resume_notifier, _resume_rx) = watch::channel(0_u64);
1127        Session {
1128            rx: Box::new(rx),
1129            role: SessionRole::Initiator, // overwritten in establish_as_*
1130            parity: Parity::Odd,          // overwritten in establish_as_*
1131            sess_core,
1132            peer_supports_retry: false,
1133            local_root_settings: ConnectionSettings {
1134                parity: Parity::Odd,
1135                max_concurrent_requests: 64,
1136                initial_channel_credit: 16,
1137            },
1138            peer_root_settings: None,
1139            resumable,
1140            session_resume_key: None,
1141            conns: BTreeMap::new(),
1142            root_closed_internal: false,
1143            conn_ids: IdAllocator::new(Parity::Odd), // overwritten in establish_as_*
1144            on_connection,
1145            open_rx,
1146            close_rx,
1147            resume_rx,
1148            control_tx,
1149            control_rx,
1150            keepalive,
1151            resume_notifier,
1152            recoverer,
1153            recovery_timeout,
1154            registered_in_registry: false,
1155            observer,
1156        }
1157    }
1158
1159    pub(crate) fn resume_key(&self) -> Option<SessionResumeKey> {
1160        self.session_resume_key
1161    }
1162
1163    // r[impl session.handshake]
1164    fn establish_from_handshake(
1165        &mut self,
1166        result: HandshakeResult,
1167    ) -> Result<ConnectionHandle, SessionError> {
1168        self.role = result.role;
1169        self.parity = result.our_settings.parity;
1170        self.conn_ids = IdAllocator::new(result.our_settings.parity);
1171        self.local_root_settings = result.our_settings.clone();
1172        self.peer_root_settings = Some(result.peer_settings.clone());
1173        self.peer_supports_retry = result.peer_supports_retry;
1174        self.session_resume_key = result.session_resume_key;
1175
1176        if self.resumable && self.session_resume_key.is_none() {
1177            return Err(SessionError::NotResumable);
1178        }
1179
1180        Ok(self.make_root_handle(result.our_settings, result.peer_settings))
1181    }
1182
1183    fn make_root_handle(
1184        &mut self,
1185        local_settings: ConnectionSettings,
1186        peer_settings: ConnectionSettings,
1187    ) -> ConnectionHandle {
1188        self.make_connection_handle(ConnectionId::ROOT, local_settings, peer_settings)
1189    }
1190
1191    fn make_connection_handle(
1192        &mut self,
1193        conn_id: ConnectionId,
1194        local_settings: ConnectionSettings,
1195        peer_settings: ConnectionSettings,
1196    ) -> ConnectionHandle {
1197        let label = format!("session.conn{}", conn_id.0);
1198        let (conn_tx, conn_rx) = mpsc::channel::<RecvMessage>(&label, 64);
1199        let (failures_tx, failures_rx) = mpsc::unbounded_channel(format!("{label}.failures"));
1200        let (closed_tx, closed_rx) = watch::channel(None);
1201        let resumed_rx = self.resume_notifier.subscribe();
1202
1203        let sender = ConnectionSender {
1204            connection_id: conn_id,
1205            sess_core: Arc::clone(&self.sess_core),
1206            failures: Arc::new(failures_tx),
1207        };
1208
1209        let parity = local_settings.parity;
1210        let handle_local_settings = local_settings.clone();
1211        let handle_peer_settings = peer_settings.clone();
1212        trace!(%conn_id, "make_connection_handle: inserting slot into conns");
1213        if let Some(observer) = &self.observer {
1214            observer.driver_event(vox_types::DriverEvent::ConnectionOpened {
1215                connection_id: conn_id,
1216            });
1217        }
1218        self.conns.insert(
1219            conn_id,
1220            ConnectionSlot::Active(ConnectionState {
1221                id: conn_id,
1222                local_settings,
1223                peer_settings,
1224                conn_tx,
1225                closed_tx,
1226                schema_recv_tracker: Arc::new(vox_types::SchemaRecvTracker::new()),
1227            }),
1228        );
1229
1230        ConnectionHandle {
1231            sender,
1232            rx: conn_rx,
1233            failures_rx,
1234            control_tx: Some(self.control_tx.clone()),
1235            closed_rx,
1236            resumed_rx,
1237            local_settings: handle_local_settings,
1238            peer_settings: handle_peer_settings,
1239            parity,
1240            peer_supports_retry: self.peer_supports_retry,
1241            observer: self.observer.clone(),
1242        }
1243    }
1244
1245    /// Run the session recv loop: read from the conduit, demux by connection
1246    /// ID, and route to the appropriate connection's driver. Also processes
1247    /// open/close requests from the SessionHandle.
1248    // r[impl zerocopy.framing.pipeline.incoming]
1249    pub async fn run(&mut self) {
1250        let mut keepalive_runtime = self.make_keepalive_runtime();
1251        let mut keepalive_tick = keepalive_runtime.as_ref().map(|_| {
1252            let mut interval = vox_types::time::tokio::interval(Duration::from_millis(10));
1253            interval.set_missed_tick_behavior(vox_types::time::tokio::MissedTickBehavior::Delay);
1254            interval
1255        });
1256
1257        loop {
1258            tokio::select! {
1259                // biased: ensure conduit EOF is processed before any resume
1260                // request. Without this, tokio's random branch selection can
1261                // pick resume_rx when BOTH branches are simultaneously ready
1262                // (fast client reconnect on Linux), causing the session to
1263                // reject a valid resume while still in CONNECTED state.
1264                biased;
1265
1266                msg = self.rx.recv_msg() => {
1267                    vox_types::dlog!("[session {:?}] recv_msg returned", self.role);
1268                    match msg {
1269                        Ok(Some(msg)) => {
1270                            self.handle_message(msg, &mut keepalive_runtime).await;
1271                        }
1272                        Ok(None) => {
1273                            vox_types::dlog!("[session {:?}] recv loop: conduit returned EOF", self.role);
1274                            if !self.handle_conduit_break(&mut keepalive_runtime).await {
1275                                vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1276                                self.close_all_connections(ConnectionCloseReason::Remote);
1277                                break;
1278                            }
1279                        }
1280                        Err(error) => {
1281                            let close_reason = classify_session_recv_error(&error);
1282                            self.observe_session_recv_error(&error);
1283                            warn!(
1284                                role = ?self.role,
1285                                %error,
1286                                ?close_reason,
1287                                "session receive failed; closing connections if recovery is unavailable"
1288                            );
1289                            vox_types::dlog!("[session {:?}] recv loop: conduit recv error: {}", self.role, error);
1290                            if !self.handle_conduit_break(&mut keepalive_runtime).await {
1291                                vox_types::dlog!("[session {:?}] recv loop: breaking (not resumable)", self.role);
1292                                self.close_all_connections(close_reason);
1293                                break;
1294                            }
1295                        }
1296                    }
1297                }
1298                Some(req) = self.open_rx.recv() => {
1299                    self.handle_open_request(req).await;
1300                }
1301                Some(req) = self.close_rx.recv() => {
1302                    self.handle_close_request(req).await;
1303                }
1304                Some(req) = self.resume_rx.recv() => {
1305                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1306                        "resume is only valid while the session is disconnected".into(),
1307                    )));
1308                }
1309                Some(req) = self.control_rx.recv() => {
1310                    if !self.handle_drop_control_request(req).await {
1311                        self.close_all_connections(ConnectionCloseReason::Local);
1312                        break;
1313                    }
1314                }
1315                _ = async {
1316                    if let Some(interval) = keepalive_tick.as_mut() {
1317                        interval.tick().await;
1318                    }
1319                }, if keepalive_tick.is_some() => {
1320                    if !self.handle_keepalive_tick(&mut keepalive_runtime).await {
1321                        self.close_all_connections(ConnectionCloseReason::Protocol);
1322                        break;
1323                    }
1324                }
1325            }
1326        }
1327
1328        // Drop all connection slots so per-connection drivers exit immediately.
1329        self.close_all_connections(ConnectionCloseReason::SessionShutdown);
1330        trace!("session recv loop exited");
1331    }
1332
1333    async fn handle_conduit_break(
1334        &mut self,
1335        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1336    ) -> bool {
1337        // Recovery strategy:
1338        // 1. If we have a recoverer (client-side auto-reconnect), use it.
1339        // 2. If we're registered in a SessionRegistry (server-side), wait
1340        //    for the registry to route a reconnecting client to us.
1341        // 3. Otherwise, the session is done.
1342
1343        if let Some(recoverer) = self.recoverer.as_mut() {
1344            let recovery_fut = recoverer.next_conduit(self.session_resume_key.as_ref());
1345            let recovery_result = match self.recovery_timeout {
1346                Some(timeout) => match vox_types::time::tokio::timeout(timeout, recovery_fut).await
1347                {
1348                    Ok(r) => r,
1349                    Err(_) => return false,
1350                },
1351                None => recovery_fut.await,
1352            };
1353            match recovery_result {
1354                Ok(recovered) => {
1355                    let result =
1356                        self.resume_from_handshake(recovered.tx, recovered.rx, recovered.handshake);
1357                    match result {
1358                        Ok(()) => {
1359                            let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1360                            let _ = self.resume_notifier.send(next_generation);
1361                            *keepalive_runtime = self.make_keepalive_runtime();
1362                            return true;
1363                        }
1364                        Err(_) => return false,
1365                    }
1366                }
1367                Err(_) => return false,
1368            }
1369        }
1370
1371        if !self.registered_in_registry {
1372            return false;
1373        }
1374
1375        loop {
1376            tokio::select! {
1377                Some(req) = self.resume_rx.recv() => {
1378                    let result =
1379                        self.resume_from_handshake(req.tx, req.rx, req.handshake_result);
1380                    let ok = result.is_ok();
1381                    let _ = req.result_tx.send(result);
1382                    if ok {
1383                        let next_generation = self.resume_notifier.borrow().wrapping_add(1);
1384                        let _ = self.resume_notifier.send(next_generation);
1385                        *keepalive_runtime = self.make_keepalive_runtime();
1386                        return true;
1387                    }
1388                }
1389                Some(req) = self.control_rx.recv() => {
1390                    if !self.handle_drop_control_request(req).await {
1391                        return false;
1392                    }
1393                }
1394                Some(req) = self.open_rx.recv() => {
1395                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1396                        "session is disconnected; resume before opening connections".into(),
1397                    )));
1398                }
1399                Some(req) = self.close_rx.recv() => {
1400                    let _ = req.result_tx.send(Err(SessionError::Protocol(
1401                        "session is disconnected; resume before closing connections".into(),
1402                    )));
1403                }
1404                else => return false,
1405            }
1406        }
1407    }
1408
1409    // r[impl session.handshake.resume]
1410    fn resume_from_handshake(
1411        &mut self,
1412        tx: Arc<dyn DynConduitTx>,
1413        rx: Box<dyn DynConduitRx>,
1414        result: HandshakeResult,
1415    ) -> Result<(), SessionError> {
1416        let Some(peer_settings) = self.peer_root_settings.clone() else {
1417            return Err(SessionError::Protocol("missing peer root settings".into()));
1418        };
1419
1420        if result.our_settings != self.local_root_settings {
1421            return Err(SessionError::Protocol(
1422                "local root settings changed across session resume".into(),
1423            ));
1424        }
1425
1426        if result.peer_settings != peer_settings {
1427            return Err(SessionError::Protocol(
1428                "peer root settings changed across session resume".into(),
1429            ));
1430        }
1431
1432        self.peer_supports_retry = result.peer_supports_retry;
1433        self.session_resume_key = result.session_resume_key.or(self.session_resume_key);
1434
1435        self.sess_core.replace_tx_and_reset_schemas(tx);
1436        self.rx = rx;
1437        // Reset the root connection's recv tracker on reconnection —
1438        // type IDs are per-connection and must not carry over.
1439        if let Some(ConnectionSlot::Active(state)) = self.conns.get_mut(&ConnectionId::ROOT) {
1440            state.schema_recv_tracker = Arc::new(vox_types::SchemaRecvTracker::new());
1441        }
1442        Ok(())
1443    }
1444
1445    async fn handle_message(
1446        &mut self,
1447        msg: SelfRef<Message<'static>>,
1448        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1449    ) {
1450        let msg_ref = msg.get();
1451        let conn_id = msg_ref.connection_id;
1452        match &msg_ref.payload {
1453            MessagePayload::Ping(ping) => {
1454                let _ = self
1455                    .sess_core
1456                    .send(
1457                        Message {
1458                            connection_id: conn_id,
1459                            payload: MessagePayload::Pong(vox_types::Pong { nonce: ping.nonce }),
1460                        },
1461                        None,
1462                        None,
1463                    )
1464                    .await;
1465                return;
1466            }
1467            MessagePayload::Pong(pong) => {
1468                if conn_id.is_root() {
1469                    self.handle_keepalive_pong(pong.nonce, keepalive_runtime);
1470                }
1471                return;
1472            }
1473            MessagePayload::SchemaMessage(schema_msg) => {
1474                let schema_recv_tracker = match self.conns.get(&conn_id) {
1475                    Some(ConnectionSlot::Active(state)) => Arc::clone(&state.schema_recv_tracker),
1476                    _ => return,
1477                };
1478                let _ = self.record_received_schema_cbor(
1479                    conn_id,
1480                    schema_recv_tracker,
1481                    schema_msg.method_id,
1482                    schema_msg.direction,
1483                    &schema_msg.schemas,
1484                    "standalone schema message",
1485                );
1486                return;
1487            }
1488            _ => {}
1489        }
1490        vox_types::selfref_match!(msg, payload {
1491            // r[impl connection.close.semantics]
1492            MessagePayload::ConnectionClose(_) => {
1493                if conn_id.is_root() {
1494                    warn!("received ConnectionClose for root connection");
1495                } else {
1496                    trace!(conn_id = conn_id.0, "received ConnectionClose for virtual connection");
1497                }
1498                // Remove the connection — dropping conn_tx causes the Driver's rx
1499                // to return None, which exits its run loop. All in-flight handlers
1500                // are dropped, triggering DriverReplySink::drop → Cancelled responses.
1501                self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Remote);
1502                self.maybe_request_shutdown_after_root_closed();
1503            }
1504            MessagePayload::ConnectionOpen(open) => {
1505                self.handle_inbound_open(conn_id, open).await;
1506            }
1507            MessagePayload::ConnectionAccept(accept) => {
1508                self.handle_inbound_accept(conn_id, accept);
1509            }
1510            MessagePayload::ConnectionReject(reject) => {
1511                self.handle_inbound_reject(conn_id, reject);
1512            }
1513            MessagePayload::RequestMessage(r) => {
1514                let r_ref = r.get();
1515                vox_types::dlog!(
1516                    "[session {:?}] recv request: conn={:?} req={:?} body={} method={:?}",
1517                    self.role,
1518                    conn_id,
1519                    r_ref.id,
1520                    match &r_ref.body {
1521                        RequestBody::Call(_) => "Call",
1522                        RequestBody::Response(_) => "Response",
1523                        RequestBody::Cancel(_) => "Cancel",
1524                    },
1525                    match &r_ref.body {
1526                        RequestBody::Call(call) => Some(call.method_id),
1527                        RequestBody::Response(_) | RequestBody::Cancel(_) => None,
1528                    }
1529                );
1530                // Record any inlined schemas from the incoming request before routing
1531                let response_had_schema_payload = matches!(&r_ref.body, RequestBody::Response(resp) if !resp.schemas.is_empty());
1532                {
1533                    let schemas_cbor = match &r_ref.body {
1534                        RequestBody::Call(call) => Some(&call.schemas),
1535                        RequestBody::Response(resp) => Some(&resp.schemas),
1536                        _ => None,
1537                    };
1538                    vox_types::dlog!(
1539                        "[schema] recv ({:?}): req={:?} body={} schemas_len={:?}",
1540                        self.role,
1541                        r_ref.id,
1542                    match &r_ref.body {
1543                            RequestBody::Call(_) => "Call",
1544                            RequestBody::Response(_) => "Response",
1545                            RequestBody::Cancel(_) => "Cancel",
1546                        },
1547                        schemas_cbor.map(|s| s.0.len())
1548                    );
1549                    let schema_recv_tracker = match self.conns.get(&conn_id) {
1550                        Some(ConnectionSlot::Active(state)) => {
1551                            Arc::clone(&state.schema_recv_tracker)
1552                        }
1553                        _ => return,
1554                    };
1555                    if let Some(schemas_cbor) = schemas_cbor
1556                        && !schemas_cbor.is_empty()
1557                    {
1558                        let (method_id, direction) = match &r_ref.body {
1559                            RequestBody::Call(call) => {
1560                                (call.method_id, vox_types::BindingDirection::Args)
1561                            }
1562                            RequestBody::Response(_) => {
1563                                let Some(method_id) =
1564                                    self.sess_core.take_outgoing_call_method(conn_id, r_ref.id)
1565                                else {
1566                                    self.close_connection_for_protocol_error(
1567                                        conn_id,
1568                                        format!(
1569                                            "response schemas for unknown inflight request {:?}",
1570                                            r_ref.id
1571                                        ),
1572                                    );
1573                                    return;
1574                                };
1575                                (method_id, vox_types::BindingDirection::Response)
1576                            }
1577                            RequestBody::Cancel(_) => unreachable!(),
1578                        };
1579                        if !self.record_received_schema_cbor(
1580                            conn_id,
1581                            schema_recv_tracker,
1582                            method_id,
1583                            direction,
1584                            schemas_cbor,
1585                            "inlined request schemas",
1586                        ) {
1587                            return;
1588                        }
1589                    }
1590                }
1591                if matches!(&r_ref.body, RequestBody::Response(_)) && !response_had_schema_payload {
1592                    let _ = self.sess_core.take_outgoing_call_method(conn_id, r_ref.id);
1593                }
1594                // Record incoming calls so SessionCore::send() can look up
1595                // the method_id when sending the response.
1596                if let RequestBody::Call(call) = &r_ref.body {
1597                    self.sess_core.record_incoming_call(conn_id, r_ref.id, call.method_id);
1598                }
1599                let state = match self.conns.get(&conn_id) {
1600                    Some(ConnectionSlot::Active(state)) => state,
1601                    _ => return,
1602                };
1603                let conn_tx = state.conn_tx.clone();
1604                let request_id = r_ref.id;
1605                let body_kind = match &r_ref.body {
1606                    RequestBody::Call(_) => "Call",
1607                    RequestBody::Response(_) => "Response",
1608                    RequestBody::Cancel(_) => "Cancel",
1609                };
1610                let recv_msg = RecvMessage {
1611                    schemas: Arc::clone(&state.schema_recv_tracker),
1612                    msg: r.map(ConnectionMessage::Request),
1613                };
1614                vox_types::dlog!(
1615                    "[session {:?}] dispatch request: conn={:?} req={:?} body={}",
1616                    self.role,
1617                    conn_id,
1618                    request_id,
1619                    body_kind
1620                );
1621                if conn_tx.send(recv_msg).await.is_err() {
1622                    self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1623                    self.maybe_request_shutdown_after_root_closed();
1624                }
1625            }
1626            MessagePayload::ChannelMessage(c) => {
1627                let state = match self.conns.get(&conn_id) {
1628                    Some(ConnectionSlot::Active(state)) => state,
1629                    _ => return,
1630                };
1631                let conn_tx = state.conn_tx.clone();
1632                let recv_msg = RecvMessage {
1633                    schemas: Arc::clone(&state.schema_recv_tracker),
1634                    msg: c.map(ConnectionMessage::Channel),
1635                };
1636                if conn_tx.send(recv_msg).await.is_err() {
1637                    self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
1638                    self.maybe_request_shutdown_after_root_closed();
1639                }
1640            }
1641            // ProtocolError: not valid post-handshake, drop.
1642        })
1643    }
1644
1645    fn make_keepalive_runtime(&self) -> Option<KeepaliveRuntime> {
1646        let config = self.keepalive?;
1647        if config.ping_interval.is_zero() || config.pong_timeout.is_zero() {
1648            warn!("keepalive disabled due to non-positive interval/timeout");
1649            return None;
1650        }
1651        let now = vox_types::time::tokio::Instant::now();
1652        Some(KeepaliveRuntime {
1653            ping_interval: config.ping_interval,
1654            pong_timeout: config.pong_timeout,
1655            next_ping_at: now + config.ping_interval,
1656            waiting_pong_nonce: None,
1657            pong_deadline: now,
1658            next_ping_nonce: 1,
1659        })
1660    }
1661
1662    fn handle_keepalive_pong(&self, nonce: u64, keepalive_runtime: &mut Option<KeepaliveRuntime>) {
1663        let Some(runtime) = keepalive_runtime.as_mut() else {
1664            return;
1665        };
1666        if runtime.waiting_pong_nonce != Some(nonce) {
1667            return;
1668        }
1669        runtime.waiting_pong_nonce = None;
1670        runtime.next_ping_at = vox_types::time::tokio::Instant::now() + runtime.ping_interval;
1671    }
1672
1673    async fn handle_keepalive_tick(
1674        &mut self,
1675        keepalive_runtime: &mut Option<KeepaliveRuntime>,
1676    ) -> bool {
1677        let Some(runtime) = keepalive_runtime.as_mut() else {
1678            return true;
1679        };
1680        let now = vox_types::time::tokio::Instant::now();
1681
1682        if let Some(waiting_nonce) = runtime.waiting_pong_nonce {
1683            if now >= runtime.pong_deadline {
1684                warn!(
1685                    nonce = waiting_nonce,
1686                    timeout_ms = runtime.pong_timeout.as_millis(),
1687                    "keepalive timeout waiting for pong"
1688                );
1689                return false;
1690            }
1691            return true;
1692        }
1693
1694        if now < runtime.next_ping_at {
1695            return true;
1696        }
1697
1698        let nonce = runtime.next_ping_nonce;
1699        if self
1700            .sess_core
1701            .send(
1702                Message {
1703                    connection_id: ConnectionId::ROOT,
1704                    payload: MessagePayload::Ping(vox_types::Ping { nonce }),
1705                },
1706                None,
1707                None,
1708            )
1709            .await
1710            .is_err()
1711        {
1712            warn!("failed to send keepalive ping");
1713            return false;
1714        }
1715
1716        runtime.waiting_pong_nonce = Some(nonce);
1717        runtime.pong_deadline = now + runtime.pong_timeout;
1718        runtime.next_ping_at = now + runtime.ping_interval;
1719        runtime.next_ping_nonce = runtime.next_ping_nonce.wrapping_add(1);
1720        true
1721    }
1722
1723    async fn handle_inbound_open(
1724        &mut self,
1725        conn_id: ConnectionId,
1726        open: SelfRef<ConnectionOpen<'static>>,
1727    ) {
1728        // Validate: connection ID must match peer's parity (opposite of ours).
1729        let peer_parity = self.parity.other();
1730        if !conn_id.has_parity(peer_parity) {
1731            // Protocol error: wrong parity. For now, just reject.
1732            let _ = self
1733                .sess_core
1734                .send(
1735                    Message {
1736                        connection_id: conn_id,
1737                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1738                            metadata: vec![],
1739                        }),
1740                    },
1741                    None,
1742                    None,
1743                )
1744                .await;
1745            return;
1746        }
1747
1748        // Validate: connection ID must not already be in use.
1749        if self.conns.contains_key(&conn_id) {
1750            // Protocol error: duplicate connection ID.
1751            let _ = self
1752                .sess_core
1753                .send(
1754                    Message {
1755                        connection_id: conn_id,
1756                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1757                            metadata: vec![],
1758                        }),
1759                    },
1760                    None,
1761                    None,
1762                )
1763                .await;
1764            return;
1765        }
1766
1767        // r[impl connection.open.rejection]
1768        // Call the acceptor callback. If none is registered, reject.
1769        if self.on_connection.is_none() {
1770            let _ = self
1771                .sess_core
1772                .send(
1773                    Message {
1774                        connection_id: conn_id,
1775                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1776                            metadata: vec![],
1777                        }),
1778                    },
1779                    None,
1780                    None,
1781                )
1782                .await;
1783            return;
1784        }
1785
1786        // Derive settings: opposite parity, same limits for now.
1787        let open = open.get();
1788        if open.connection_settings.initial_channel_credit == 0 {
1789            let _ = self
1790                .sess_core
1791                .send(
1792                    Message {
1793                        connection_id: conn_id,
1794                        payload: MessagePayload::ConnectionReject(vox_types::ConnectionReject {
1795                            metadata: vec![vox_types::MetadataEntry::str(
1796                                "error",
1797                                "initial_channel_credit must be greater than zero",
1798                            )],
1799                        }),
1800                    },
1801                    None,
1802                    None,
1803                )
1804                .await;
1805            return;
1806        }
1807
1808        let our_settings = ConnectionSettings {
1809            parity: open.connection_settings.parity.other(),
1810            max_concurrent_requests: open.connection_settings.max_concurrent_requests,
1811            initial_channel_credit: open.connection_settings.initial_channel_credit,
1812        };
1813
1814        // Create the connection handle and activate it.
1815        let handle = self.make_connection_handle(
1816            conn_id,
1817            our_settings.clone(),
1818            open.connection_settings.clone(),
1819        );
1820
1821        // Let the acceptor decide the connection's fate.
1822        let mut metadata: Vec<vox_types::MetadataEntry<'_>> = open.metadata.to_vec();
1823        metadata.push(vox_types::MetadataEntry::str(
1824            "vox-connection-kind",
1825            "virtual",
1826        ));
1827        let request = match ConnectionRequest::new(&metadata) {
1828            Ok(r) => r,
1829            Err(e) => {
1830                trace!(%conn_id, %e, "rejecting virtual connection");
1831                self.conns.remove(&conn_id);
1832                let _ = self
1833                    .sess_core
1834                    .send(
1835                        Message {
1836                            connection_id: conn_id,
1837                            payload: MessagePayload::ConnectionReject(
1838                                vox_types::ConnectionReject {
1839                                    metadata: vec![vox_types::MetadataEntry::str(
1840                                        "error",
1841                                        e.to_string(),
1842                                    )],
1843                                },
1844                            ),
1845                        },
1846                        None,
1847                        None,
1848                    )
1849                    .await;
1850                return;
1851            }
1852        };
1853        let pending = PendingConnection::new(handle);
1854        let acceptor = self.on_connection.as_ref().unwrap();
1855        trace!(%conn_id, "calling acceptor for virtual connection");
1856        match acceptor.accept(&request, pending) {
1857            Ok(()) => {
1858                trace!(%conn_id, "acceptor accepted virtual connection, sending ConnectionAccept");
1859                let _ = self
1860                    .sess_core
1861                    .send(
1862                        Message {
1863                            connection_id: conn_id,
1864                            payload: MessagePayload::ConnectionAccept(
1865                                vox_types::ConnectionAccept {
1866                                    connection_settings: our_settings,
1867                                    metadata: vec![],
1868                                },
1869                            ),
1870                        },
1871                        None,
1872                        None,
1873                    )
1874                    .await;
1875            }
1876            Err(reject_metadata) => {
1877                // Clean up the connection slot we created.
1878                trace!(%conn_id, "acceptor rejected, removing conn slot");
1879                self.conns.remove(&conn_id);
1880                let _ = self
1881                    .sess_core
1882                    .send(
1883                        Message {
1884                            connection_id: conn_id,
1885                            payload: MessagePayload::ConnectionReject(
1886                                vox_types::ConnectionReject {
1887                                    metadata: reject_metadata,
1888                                },
1889                            ),
1890                        },
1891                        None,
1892                        None,
1893                    )
1894                    .await;
1895            }
1896        }
1897    }
1898
1899    fn handle_inbound_accept(
1900        &mut self,
1901        conn_id: ConnectionId,
1902        accept: SelfRef<ConnectionAccept<'static>>,
1903    ) {
1904        let accept = accept.get();
1905        let slot = self.remove_connection(&conn_id);
1906        match slot {
1907            Some(ConnectionSlot::PendingOutbound(mut pending))
1908                if accept.connection_settings.initial_channel_credit == 0 =>
1909            {
1910                if let Some(tx) = pending.result_tx.take() {
1911                    let _ = tx.send(Err(SessionError::Protocol(
1912                        "initial_channel_credit must be greater than zero".into(),
1913                    )));
1914                }
1915            }
1916            Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1917                let handle = self.make_connection_handle(
1918                    conn_id,
1919                    pending.local_settings.clone(),
1920                    accept.connection_settings.clone(),
1921                );
1922
1923                if let Some(tx) = pending.result_tx.take() {
1924                    let _ = tx.send(Ok(handle));
1925                }
1926            }
1927            Some(other) => {
1928                // Not pending outbound — put it back and ignore.
1929                self.conns.insert(conn_id, other);
1930            }
1931            None => {
1932                // No pending open for this ID — ignore.
1933            }
1934        }
1935    }
1936
1937    fn handle_inbound_reject(
1938        &mut self,
1939        conn_id: ConnectionId,
1940        reject: SelfRef<ConnectionReject<'static>>,
1941    ) {
1942        let reject = reject.get();
1943        let slot = self.remove_connection(&conn_id);
1944        match slot {
1945            Some(ConnectionSlot::PendingOutbound(mut pending)) => {
1946                if let Some(tx) = pending.result_tx.take() {
1947                    let _ = tx.send(Err(SessionError::Rejected(vox_types::metadata_into_owned(
1948                        reject.metadata.to_vec(),
1949                    ))));
1950                }
1951            }
1952            Some(other) => {
1953                self.conns.insert(conn_id, other);
1954            }
1955            None => {}
1956        }
1957    }
1958
1959    // r[impl connection.open]
1960    async fn handle_open_request(&mut self, req: OpenRequest) {
1961        if req.settings.initial_channel_credit == 0 {
1962            let _ = req.result_tx.send(Err(SessionError::Protocol(
1963                "initial_channel_credit must be greater than zero".into(),
1964            )));
1965            return;
1966        }
1967
1968        let conn_id = self.conn_ids.alloc();
1969
1970        // Send ConnectionOpen to the peer.
1971        let send_result = self
1972            .sess_core
1973            .send(
1974                Message {
1975                    connection_id: conn_id,
1976                    payload: MessagePayload::ConnectionOpen(ConnectionOpen {
1977                        connection_settings: req.settings.clone(),
1978                        metadata: req.metadata,
1979                    }),
1980                },
1981                None,
1982                None,
1983            )
1984            .await;
1985
1986        if send_result.is_err() {
1987            let _ = req.result_tx.send(Err(SessionError::Protocol(
1988                "failed to send ConnectionOpen".into(),
1989            )));
1990            return;
1991        }
1992
1993        // Store the pending state. The run loop will complete the oneshot
1994        // when ConnectionAccept or ConnectionReject arrives.
1995        self.conns.insert(
1996            conn_id,
1997            ConnectionSlot::PendingOutbound(PendingOutboundData {
1998                local_settings: req.settings,
1999                result_tx: Some(req.result_tx),
2000            }),
2001        );
2002    }
2003
2004    // r[impl connection.close]
2005    async fn handle_close_request(&mut self, req: CloseRequest) {
2006        if req.conn_id.is_root() {
2007            let _ = req.result_tx.send(Err(SessionError::Protocol(
2008                "cannot close root connection".into(),
2009            )));
2010            return;
2011        }
2012
2013        // Remove the connection slot — this drops conn_tx and causes the
2014        // Driver to exit cleanly.
2015        if self
2016            .remove_connection_with_reason(&req.conn_id, ConnectionCloseReason::Local)
2017            .is_none()
2018        {
2019            let _ = req
2020                .result_tx
2021                .send(Err(SessionError::Protocol("connection not found".into())));
2022            return;
2023        }
2024
2025        // Send ConnectionClose to the peer.
2026        let send_result = self
2027            .sess_core
2028            .send(
2029                Message {
2030                    connection_id: req.conn_id,
2031                    payload: MessagePayload::ConnectionClose(ConnectionClose {
2032                        metadata: req.metadata,
2033                    }),
2034                },
2035                None,
2036                None,
2037            )
2038            .await;
2039
2040        if send_result.is_err() {
2041            let _ = req.result_tx.send(Err(SessionError::Protocol(
2042                "failed to send ConnectionClose".into(),
2043            )));
2044            return;
2045        }
2046
2047        let _ = req.result_tx.send(Ok(()));
2048        self.maybe_request_shutdown_after_root_closed();
2049    }
2050
2051    async fn handle_drop_control_request(&mut self, req: DropControlRequest) -> bool {
2052        match req {
2053            DropControlRequest::Shutdown => {
2054                trace!("session shutdown requested");
2055                false
2056            }
2057            DropControlRequest::Close(conn_id) => {
2058                // r[impl rpc.caller.liveness.last-drop-closes-connection]
2059                if conn_id.is_root() {
2060                    // r[impl rpc.caller.liveness.root-internal-close]
2061                    trace!("root callers dropped; internally closing root connection");
2062                    self.root_closed_internal = true;
2063                    // r[impl rpc.caller.liveness.root-teardown-condition]
2064                    return self.has_virtual_connections();
2065                }
2066
2067                if self
2068                    .remove_connection_with_reason(&conn_id, ConnectionCloseReason::Local)
2069                    .is_some()
2070                {
2071                    let _ = self
2072                        .sess_core
2073                        .send(
2074                            Message {
2075                                connection_id: conn_id,
2076                                payload: MessagePayload::ConnectionClose(ConnectionClose {
2077                                    metadata: vec![],
2078                                }),
2079                            },
2080                            None,
2081                            None,
2082                        )
2083                        .await;
2084                }
2085
2086                !self.root_closed_internal || self.has_virtual_connections()
2087            }
2088        }
2089    }
2090
2091    fn has_virtual_connections(&self) -> bool {
2092        self.conns.keys().any(|id| !id.is_root())
2093    }
2094
2095    fn remove_connection(&mut self, conn_id: &ConnectionId) -> Option<ConnectionSlot> {
2096        self.remove_connection_with_reason(conn_id, ConnectionCloseReason::Unknown)
2097    }
2098
2099    fn remove_connection_with_reason(
2100        &mut self,
2101        conn_id: &ConnectionId,
2102        reason: ConnectionCloseReason,
2103    ) -> Option<ConnectionSlot> {
2104        trace!(%conn_id, "remove_connection called");
2105        let slot = self.conns.remove(conn_id);
2106        if let Some(ConnectionSlot::Active(state)) = &slot {
2107            let _ = state.closed_tx.send(Some(reason));
2108            if let Some(observer) = &self.observer {
2109                observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2110                    connection_id: *conn_id,
2111                    reason,
2112                });
2113            }
2114        }
2115        slot
2116    }
2117
2118    // r[impl rpc.observability.session-errors]
2119    fn close_all_connections(&mut self, reason: ConnectionCloseReason) {
2120        trace!(role = ?self.role, count = self.conns.len(), "close_all_connections");
2121        vox_types::dlog!(
2122            "[session {:?}] close_all_connections: {} slots",
2123            self.role,
2124            self.conns.len()
2125        );
2126        for (conn_id, slot) in self.conns.iter() {
2127            if let ConnectionSlot::Active(state) = slot {
2128                vox_types::dlog!("[session {:?}] closing connection {:?}", self.role, conn_id);
2129                let _ = state.closed_tx.send(Some(reason));
2130                if let Some(observer) = &self.observer {
2131                    observer.driver_event(vox_types::DriverEvent::ConnectionClosed {
2132                        connection_id: *conn_id,
2133                        reason,
2134                    });
2135                }
2136            }
2137        }
2138        self.conns.clear();
2139    }
2140
2141    fn maybe_request_shutdown_after_root_closed(&self) {
2142        if self.root_closed_internal && !self.has_virtual_connections() {
2143            let _ = send_drop_control(&self.control_tx, DropControlRequest::Shutdown);
2144        }
2145    }
2146}
2147
2148pub(crate) struct SessionCore {
2149    inner: std::sync::Mutex<SessionCoreInner>,
2150    outbound_tx: tokio_mpsc::Sender<OutboundBatch>,
2151    observer: Option<VoxObserverHandle>,
2152}
2153
2154pub trait OutboundSendFuture: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2155impl<T> OutboundSendFuture for T where T: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
2156
2157type OutboundSend = Pin<Box<dyn OutboundSendFuture>>;
2158
2159#[derive(Clone)]
2160struct PendingSchemaSend {
2161    method_id: vox_types::MethodId,
2162    direction: vox_types::BindingDirection,
2163    prepared: vox_types::PreparedSchemaPlan,
2164}
2165
2166struct OutboundBatch {
2167    conn_id: ConnectionId,
2168    conn_state: Arc<std::sync::Mutex<SendConnState>>,
2169    tx: Arc<dyn DynConduitTx>,
2170    schema_sends: Vec<PendingSchemaSend>,
2171    payload_send: OutboundSend,
2172    result_tx: tokio_oneshot::Sender<std::io::Result<()>>,
2173}
2174
2175async fn run_outbound_worker(mut rx: tokio_mpsc::Receiver<OutboundBatch>) {
2176    while let Some(batch) = rx.recv().await {
2177        let mut result = Ok(());
2178        for schema_send in batch.schema_sends {
2179            let schemas = {
2180                let mut conn_state = batch
2181                    .conn_state
2182                    .lock()
2183                    .expect("send conn state mutex poisoned");
2184                conn_state.send_tracker.preview_prepared_plan(
2185                    schema_send.method_id,
2186                    schema_send.direction,
2187                    &schema_send.prepared,
2188                )
2189            };
2190            if schemas.is_empty() {
2191                continue;
2192            }
2193
2194            let schema_msg = Message {
2195                connection_id: batch.conn_id,
2196                payload: MessagePayload::SchemaMessage(SchemaMessage {
2197                    method_id: schema_send.method_id,
2198                    direction: schema_send.direction,
2199                    schemas,
2200                }),
2201            };
2202            let send = match batch.tx.clone().prepare_msg(schema_msg, None) {
2203                Ok(send) => send,
2204                Err(error) => {
2205                    result = Err(error);
2206                    break;
2207                }
2208            };
2209            if let Err(error) = send.await {
2210                result = Err(error);
2211                break;
2212            }
2213            let mut conn_state = batch
2214                .conn_state
2215                .lock()
2216                .expect("send conn state mutex poisoned");
2217            conn_state.send_tracker.mark_prepared_plan_sent(
2218                schema_send.method_id,
2219                schema_send.direction,
2220                &schema_send.prepared,
2221            );
2222            conn_state
2223                .planned_bindings
2224                .remove(&(schema_send.direction, schema_send.method_id));
2225        }
2226        if result.is_ok()
2227            && let Err(error) = batch.payload_send.await
2228        {
2229            result = Err(error);
2230        }
2231        let _ = batch.result_tx.send(result);
2232    }
2233}
2234
2235#[cfg(not(target_arch = "wasm32"))]
2236fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2237    if tokio::runtime::Handle::try_current().is_ok() {
2238        tokio::spawn(run_outbound_worker(rx));
2239        return;
2240    }
2241
2242    std::thread::spawn(move || {
2243        let runtime = tokio::runtime::Builder::new_current_thread()
2244            .enable_all()
2245            .build()
2246            .expect("build outbound worker runtime");
2247        runtime.block_on(run_outbound_worker(rx));
2248    });
2249}
2250
2251#[cfg(target_arch = "wasm32")]
2252fn spawn_outbound_worker(rx: tokio_mpsc::Receiver<OutboundBatch>) {
2253    wasm_bindgen_futures::spawn_local(run_outbound_worker(rx));
2254}
2255
2256struct SendConnState {
2257    /// Tracks which schemas we have sent on this connection.
2258    send_tracker: vox_types::SchemaSendTracker,
2259
2260    /// Maps request_id → method_id for in-flight incoming calls, so we can
2261    /// look up the method_id when sending the response.
2262    inflight_incoming: HashMap<RequestId, vox_types::MethodId>,
2263
2264    /// Maps request_id → method_id for outbound calls awaiting a response, so
2265    /// inbound response schema payloads can bind their root TypeRef.
2266    inflight_outgoing: HashMap<RequestId, vox_types::MethodId>,
2267
2268    /// Structured schema plans cached per binding until the first committed send.
2269    planned_bindings:
2270        HashMap<(vox_types::BindingDirection, vox_types::MethodId), vox_types::PreparedSchemaPlan>,
2271}
2272
2273impl SendConnState {
2274    fn new() -> Self {
2275        SendConnState {
2276            send_tracker: vox_types::SchemaSendTracker::new(),
2277            inflight_incoming: HashMap::new(),
2278            inflight_outgoing: HashMap::new(),
2279            planned_bindings: HashMap::new(),
2280        }
2281    }
2282}
2283
2284struct SessionCoreInner {
2285    /// Underlying conduit (tx end)
2286    tx: Arc<dyn DynConduitTx>,
2287
2288    /// Per-connection state re: sent schemas, etc.
2289    conns: HashMap<ConnectionId, Arc<std::sync::Mutex<SendConnState>>>,
2290}
2291
2292fn get_or_create_send_conn_state(
2293    inner: &mut SessionCoreInner,
2294    conn_id: ConnectionId,
2295) -> Arc<std::sync::Mutex<SendConnState>> {
2296    inner
2297        .conns
2298        .entry(conn_id)
2299        .or_insert_with(|| Arc::new(std::sync::Mutex::new(SendConnState::new())))
2300        .clone()
2301}
2302
2303impl SessionCore {
2304    pub(crate) fn outbound_queue_stats(&self) -> (usize, usize) {
2305        let capacity = self.outbound_tx.max_capacity();
2306        let available = self.outbound_tx.capacity();
2307        (capacity.saturating_sub(available), capacity)
2308    }
2309
2310    fn prepare_outbound_batch<'a>(
2311        &self,
2312        mut msg: Message<'a>,
2313        binder: Option<&'a dyn vox_types::ChannelBinder>,
2314        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2315    ) -> Result<(OutboundBatch, tokio_oneshot::Receiver<std::io::Result<()>>), ()> {
2316        let conn_id = msg.connection_id;
2317        let (tx, conn_state, schema_sends) = {
2318            let mut inner = self.inner.lock().expect("session core mutex poisoned");
2319            let tx = inner.tx.clone();
2320            let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2321            drop(inner);
2322
2323            if let MessagePayload::RequestMessage(req) = &mut msg.payload {
2324                vox_types::dlog!(
2325                    "[session-core] send request: conn={:?} req={:?} body={} forwarded={}",
2326                    conn_id,
2327                    req.id,
2328                    match &req.body {
2329                        RequestBody::Call(_) => "Call",
2330                        RequestBody::Response(_) => "Response",
2331                        RequestBody::Cancel(_) => "Cancel",
2332                    },
2333                    forwarded_schemas.is_some()
2334                );
2335                let schema_sends = {
2336                    let mut conn_state_guard =
2337                        conn_state.lock().expect("send conn state mutex poisoned");
2338                    let mut schema_sends = Vec::new();
2339                    match &mut req.body {
2340                        RequestBody::Call(call) => {
2341                            if let Some(schema_send) = Self::plan_call_schema_send(
2342                                &mut conn_state_guard,
2343                                req.id,
2344                                call.method_id,
2345                                call,
2346                                forwarded_schemas,
2347                            ) {
2348                                schema_sends.push(schema_send);
2349                            }
2350                            call.schemas = Default::default();
2351                        }
2352                        RequestBody::Response(resp) => {
2353                            if let Some(method_id) =
2354                                conn_state_guard.inflight_incoming.remove(&req.id)
2355                                && let Some(schema_send) = Self::plan_response_schema_send(
2356                                    &mut conn_state_guard,
2357                                    req.id,
2358                                    method_id,
2359                                    resp,
2360                                    forwarded_schemas,
2361                                )
2362                            {
2363                                schema_sends.push(schema_send);
2364                            }
2365                            resp.schemas = Default::default();
2366                        }
2367                        RequestBody::Cancel(_) => {}
2368                    }
2369                    schema_sends
2370                };
2371                (tx, conn_state, schema_sends)
2372            } else {
2373                (tx, conn_state, Vec::new())
2374            }
2375        };
2376        let payload_send = tx.clone().prepare_msg(msg, binder).map_err(|_| ())?;
2377
2378        let (result_tx, result_rx) = tokio_oneshot::channel();
2379        Ok((
2380            OutboundBatch {
2381                conn_id,
2382                conn_state,
2383                tx,
2384                schema_sends,
2385                payload_send,
2386                result_tx,
2387            },
2388            result_rx,
2389        ))
2390    }
2391
2392    // r[impl schema.principles.sender-driven]
2393    pub(crate) async fn send<'a>(
2394        &self,
2395        msg: Message<'a>,
2396        binder: Option<&'a dyn vox_types::ChannelBinder>,
2397        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2398    ) -> Result<(), ()> {
2399        let connection_id = msg.connection_id;
2400        let (batch, result_rx) = self.prepare_outbound_batch(msg, binder, forwarded_schemas)?;
2401        if self.outbound_tx.send(batch).await.is_err() {
2402            if let Some(observer) = &self.observer {
2403                observer
2404                    .driver_event(vox_types::DriverEvent::OutboundQueueClosed { connection_id });
2405            }
2406            return Err(());
2407        }
2408        let result = result_rx.await.map_err(|_| ());
2409        match result? {
2410            Ok(()) => Ok(()),
2411            Err(_) => {
2412                if let Some(observer) = &self.observer {
2413                    observer.driver_event(vox_types::DriverEvent::EncodeError {
2414                        connection_id,
2415                        kind: vox_types::EncodeErrorKind::Transport,
2416                    });
2417                }
2418                Err(())
2419            }
2420        }
2421    }
2422
2423    // r[impl rpc.flow-control.credit.try-send]
2424    pub(crate) fn try_send<'a>(
2425        &self,
2426        msg: Message<'a>,
2427        binder: Option<&'a dyn vox_types::ChannelBinder>,
2428        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2429    ) -> Result<(), TrySendError<()>> {
2430        let connection_id = msg.connection_id;
2431        let (batch, _result_rx) = self
2432            .prepare_outbound_batch(msg, binder, forwarded_schemas)
2433            .map_err(|_| TrySendError::Closed(()))?;
2434        self.outbound_tx.try_send(batch).map_err(|err| match err {
2435            tokio_mpsc::error::TrySendError::Full(_) => {
2436                if let Some(observer) = &self.observer {
2437                    observer
2438                        .driver_event(vox_types::DriverEvent::OutboundQueueFull { connection_id });
2439                }
2440                TrySendError::Full(())
2441            }
2442            tokio_mpsc::error::TrySendError::Closed(_) => {
2443                if let Some(observer) = &self.observer {
2444                    observer.driver_event(vox_types::DriverEvent::OutboundQueueClosed {
2445                        connection_id,
2446                    });
2447                }
2448                TrySendError::Closed(())
2449            }
2450        })
2451    }
2452
2453    /// Record that an incoming call was received, so we can look up the
2454    /// method_id when sending the response.
2455    pub(crate) fn record_incoming_call(
2456        &self,
2457        conn_id: ConnectionId,
2458        request_id: RequestId,
2459        method_id: vox_types::MethodId,
2460    ) {
2461        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2462        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2463        vox_types::dlog!(
2464            "[schema] record_incoming_call: conn={:?} req={:?} method={:?}",
2465            conn_id,
2466            request_id,
2467            method_id
2468        );
2469        conn_state
2470            .lock()
2471            .expect("send conn state mutex poisoned")
2472            .inflight_incoming
2473            .insert(request_id, method_id);
2474    }
2475
2476    pub(crate) fn take_outgoing_call_method(
2477        &self,
2478        conn_id: ConnectionId,
2479        request_id: RequestId,
2480    ) -> Option<vox_types::MethodId> {
2481        let inner = self.inner.lock().expect("session core mutex poisoned");
2482        inner.conns.get(&conn_id).and_then(|conn_state| {
2483            conn_state
2484                .lock()
2485                .expect("send conn state mutex poisoned")
2486                .inflight_outgoing
2487                .remove(&request_id)
2488        })
2489    }
2490
2491    pub(crate) fn prepare_response_for_method(
2492        &self,
2493        conn_id: ConnectionId,
2494        request_id: RequestId,
2495        method_id: vox_types::MethodId,
2496        response: &mut RequestResponse<'_>,
2497    ) {
2498        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2499        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2500        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2501        let key = (vox_types::BindingDirection::Response, method_id);
2502        if conn_state
2503            .send_tracker
2504            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2505        {
2506            response.schemas = Default::default();
2507            return;
2508        }
2509
2510        let prepared = match &response.ret {
2511            vox_types::Payload::Value { shape, .. } => {
2512                match Self::get_or_plan_binding_for_shape(
2513                    &mut conn_state,
2514                    key,
2515                    request_id,
2516                    "response",
2517                    shape,
2518                ) {
2519                    Some(prepared) => prepared,
2520                    None => return,
2521                }
2522            }
2523            vox_types::Payload::PostcardBytes(_) => {
2524                tracing::error!(
2525                    "schema attachment failed: missing forwarded response schemas for method {:?}",
2526                    method_id
2527                );
2528                return;
2529            }
2530        };
2531        response.schemas = prepared.to_cbor();
2532    }
2533
2534    /// Prepare response schemas from an explicit canonical root type and schema source.
2535    pub(crate) fn prepare_response_from_source(
2536        &self,
2537        conn_id: ConnectionId,
2538        _request_id: RequestId,
2539        method_id: vox_types::MethodId,
2540        root_type: &vox_types::TypeRef,
2541        source: &dyn vox_types::SchemaSource,
2542        response: &mut RequestResponse<'_>,
2543    ) {
2544        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2545        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2546        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2547        let key = (vox_types::BindingDirection::Response, method_id);
2548        if conn_state
2549            .send_tracker
2550            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2551        {
2552            response.schemas = Default::default();
2553            return;
2554        }
2555        let prepared =
2556            Self::get_or_plan_binding_from_source(&mut conn_state, key, root_type, source);
2557        response.schemas = prepared.to_cbor();
2558    }
2559
2560    /// Prepare response schemas for a replay using the running code's
2561    /// static `&'static Shape` for the response type. Same connection-
2562    /// scoped dedup as fresh responses.
2563    pub(crate) fn prepare_response_from_shape(
2564        &self,
2565        conn_id: ConnectionId,
2566        request_id: RequestId,
2567        method_id: vox_types::MethodId,
2568        response_shape: &'static Shape,
2569        response: &mut RequestResponse<'_>,
2570    ) {
2571        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2572        let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
2573        let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
2574        let key = (vox_types::BindingDirection::Response, method_id);
2575        if conn_state
2576            .send_tracker
2577            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2578        {
2579            response.schemas = Default::default();
2580            return;
2581        }
2582        let prepared = match Self::get_or_plan_binding_for_shape(
2583            &mut conn_state,
2584            key,
2585            request_id,
2586            "response",
2587            response_shape,
2588        ) {
2589            Some(prepared) => prepared,
2590            None => return,
2591        };
2592        response.schemas = prepared.to_cbor();
2593    }
2594
2595    fn get_or_plan_binding_for_shape(
2596        conn_state: &mut SendConnState,
2597        key: (vox_types::BindingDirection, vox_types::MethodId),
2598        request_id: RequestId,
2599        kind: &str,
2600        shape: &'static Shape,
2601    ) -> Option<vox_types::PreparedSchemaPlan> {
2602        if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2603            return Some(prepared.clone());
2604        }
2605        match vox_types::SchemaSendTracker::plan_for_shape(shape) {
2606            Ok(prepared) => {
2607                vox_types::dlog!(
2608                    "[schema] planned {} {} schemas for method {:?} (req {:?})",
2609                    prepared.schemas.len(),
2610                    kind,
2611                    key.1,
2612                    request_id
2613                );
2614                conn_state.planned_bindings.insert(key, prepared.clone());
2615                Some(prepared)
2616            }
2617            Err(e) => {
2618                tracing::error!("schema extraction failed: {e}");
2619                None
2620            }
2621        }
2622    }
2623
2624    fn get_or_plan_binding_from_source(
2625        conn_state: &mut SendConnState,
2626        key: (vox_types::BindingDirection, vox_types::MethodId),
2627        root_type: &vox_types::TypeRef,
2628        source: &dyn vox_types::SchemaSource,
2629    ) -> vox_types::PreparedSchemaPlan {
2630        if let Some(prepared) = conn_state.planned_bindings.get(&key) {
2631            return prepared.clone();
2632        }
2633        let prepared = vox_types::SchemaSendTracker::plan_from_source(root_type, source);
2634        conn_state.planned_bindings.insert(key, prepared.clone());
2635        prepared
2636    }
2637
2638    fn plan_response_schema_send(
2639        conn_state: &mut SendConnState,
2640        request_id: RequestId,
2641        method_id: vox_types::MethodId,
2642        response: &mut RequestResponse<'_>,
2643        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2644    ) -> Option<PendingSchemaSend> {
2645        if conn_state
2646            .send_tracker
2647            .has_sent_binding(method_id, vox_types::BindingDirection::Response)
2648        {
2649            response.schemas = Default::default();
2650            return None;
2651        }
2652
2653        let key = (vox_types::BindingDirection::Response, method_id);
2654        let prepared = if !response.schemas.is_empty() {
2655            conn_state
2656                .planned_bindings
2657                .get(&key)
2658                .cloned()
2659                .unwrap_or_else(|| {
2660                    let prepared_payload = vox_types::SchemaPayload::from_cbor(&response.schemas.0)
2661                        .expect("prepared schema payloads must be valid CBOR");
2662                    vox_types::PreparedSchemaPlan {
2663                        schemas: prepared_payload.schemas,
2664                        root: prepared_payload.root,
2665                    }
2666                })
2667        } else {
2668            match &response.ret {
2669                vox_types::Payload::Value { shape, .. } => Self::get_or_plan_binding_for_shape(
2670                    conn_state, key, request_id, "response", shape,
2671                )?,
2672                vox_types::Payload::PostcardBytes(_) => {
2673                    let Some(source) = forwarded_schemas else {
2674                        tracing::error!(
2675                            "schema attachment failed: missing forwarded response schemas for method {:?}",
2676                            method_id
2677                        );
2678                        return None;
2679                    };
2680                    let Some(root) = source.get_remote_response_root(method_id) else {
2681                        tracing::error!(
2682                            "schema attachment failed: missing forwarded response root for method {:?}",
2683                            method_id
2684                        );
2685                        return None;
2686                    };
2687                    Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2688                }
2689            }
2690        };
2691
2692        Some(PendingSchemaSend {
2693            method_id,
2694            direction: vox_types::BindingDirection::Response,
2695            prepared,
2696        })
2697    }
2698
2699    fn plan_call_schema_send(
2700        conn_state: &mut SendConnState,
2701        request_id: RequestId,
2702        method_id: vox_types::MethodId,
2703        call: &mut vox_types::RequestCall<'_>,
2704        forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
2705    ) -> Option<PendingSchemaSend> {
2706        conn_state.inflight_outgoing.insert(request_id, method_id);
2707        if conn_state
2708            .send_tracker
2709            .has_sent_binding(method_id, vox_types::BindingDirection::Args)
2710        {
2711            call.schemas = Default::default();
2712            return None;
2713        }
2714
2715        let key = (vox_types::BindingDirection::Args, method_id);
2716        let prepared = match &call.args {
2717            vox_types::Payload::Value { shape, .. } => {
2718                Self::get_or_plan_binding_for_shape(conn_state, key, request_id, "args", shape)?
2719            }
2720            vox_types::Payload::PostcardBytes(_) => {
2721                let Some(source) = forwarded_schemas else {
2722                    tracing::error!(
2723                        "schema attachment failed: missing forwarded args schemas for method {:?}",
2724                        method_id
2725                    );
2726                    return None;
2727                };
2728                let Some(root) = source.get_remote_args_root(method_id) else {
2729                    tracing::error!(
2730                        "schema attachment failed: missing forwarded args root for method {:?}",
2731                        method_id
2732                    );
2733                    return None;
2734                };
2735                Self::get_or_plan_binding_from_source(conn_state, key, &root, source)
2736            }
2737        };
2738
2739        Some(PendingSchemaSend {
2740            method_id,
2741            direction: vox_types::BindingDirection::Args,
2742            prepared,
2743        })
2744    }
2745
2746    fn replace_tx_and_reset_schemas(&self, tx: Arc<dyn DynConduitTx>) {
2747        let mut inner = self.inner.lock().expect("session core mutex poisoned");
2748        inner.tx = tx;
2749        inner.conns.clear();
2750    }
2751}
2752
2753pub(crate) struct RecoveredConduit {
2754    pub tx: Arc<dyn DynConduitTx>,
2755    pub rx: Box<dyn DynConduitRx>,
2756    pub handshake: HandshakeResult,
2757}
2758
2759pub(crate) trait ConduitRecoverer: MaybeSend {
2760    fn next_conduit<'a>(
2761        &'a mut self,
2762        resume_key: Option<&'a SessionResumeKey>,
2763    ) -> BoxFut<'a, Result<RecoveredConduit, SessionError>>;
2764}
2765
2766pub trait DynConduitTx: MaybeSend + MaybeSync {
2767    fn prepare_msg<'a>(
2768        self: Arc<Self>,
2769        msg: Message<'a>,
2770        binder: Option<&'a dyn vox_types::ChannelBinder>,
2771    ) -> std::io::Result<OutboundSend>;
2772}
2773pub trait DynConduitRx: MaybeSend {
2774    fn recv_msg<'a>(&'a mut self)
2775    -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>>;
2776}
2777
2778// r[impl zerocopy.send]
2779// r[impl zerocopy.framing.pipeline.outgoing]
2780impl<T> DynConduitTx for T
2781where
2782    T: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
2783{
2784    fn prepare_msg<'a>(
2785        self: Arc<Self>,
2786        msg: Message<'a>,
2787        binder: Option<&'a dyn vox_types::ChannelBinder>,
2788    ) -> std::io::Result<OutboundSend> {
2789        let prepared = if let Some(binder) = binder {
2790            vox_types::with_channel_binder(binder, || self.prepare_send(msg))
2791        } else {
2792            self.prepare_send(msg)
2793        };
2794        let prepared = prepared.map_err(|e| std::io::Error::other(e.to_string()))?;
2795        Ok(Box::pin(async move {
2796            self.send_prepared(prepared)
2797                .await
2798                .map_err(|e| std::io::Error::other(e.to_string()))
2799        }))
2800    }
2801}
2802
2803impl<T> DynConduitRx for T
2804where
2805    T: ConduitRx<Msg = MessageFamily> + MaybeSend,
2806{
2807    fn recv_msg<'a>(
2808        &'a mut self,
2809    ) -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>> {
2810        Box::pin(async move {
2811            self.recv()
2812                .await
2813                .map_err(|error| std::io::Error::other(error.to_string()))
2814        })
2815    }
2816}
2817
2818#[cfg(test)]
2819mod tests {
2820    use moire::sync::mpsc;
2821    use vox_types::{
2822        Backing, Conduit, ConnectionAccept, ConnectionReject, HandshakeResult, SelfRef,
2823    };
2824
2825    use super::*;
2826
2827    fn make_session() -> Session {
2828        let (a, b) = crate::memory_link_pair(32);
2829        // Keep the peer link alive so sess_core sends don't fail with broken pipe.
2830        std::mem::forget(b);
2831        let conduit = crate::BareConduit::new(a);
2832        let (tx, rx) = conduit.split();
2833        let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open.test", 4);
2834        let (_close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close.test", 4);
2835        let (_resume_tx, resume_rx) = mpsc::channel::<ResumeRequest>("session.resume.test", 1);
2836        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control.test");
2837        Session::pre_handshake(
2838            tx, rx, None, open_rx, close_rx, resume_rx, control_tx, control_rx, None, false, None,
2839            None, None,
2840        )
2841    }
2842
2843    fn resumed_handshake(
2844        our_settings: ConnectionSettings,
2845        peer_settings: ConnectionSettings,
2846    ) -> HandshakeResult {
2847        HandshakeResult {
2848            role: SessionRole::Initiator,
2849            our_settings,
2850            peer_settings,
2851            peer_supports_retry: true,
2852            session_resume_key: Some(SessionResumeKey([7; 16])),
2853            peer_resume_key: None,
2854            our_schema: vec![],
2855            peer_schema: vec![],
2856            peer_metadata: vec![],
2857        }
2858    }
2859
2860    fn accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2861        SelfRef::owning(
2862            Backing::Boxed(Box::<[u8]>::default()),
2863            ConnectionAccept {
2864                connection_settings: ConnectionSettings {
2865                    parity: Parity::Even,
2866                    max_concurrent_requests: 64,
2867                    initial_channel_credit: 16,
2868                },
2869                metadata: vec![],
2870            },
2871        )
2872    }
2873
2874    fn zero_credit_accept_ref() -> SelfRef<ConnectionAccept<'static>> {
2875        SelfRef::owning(
2876            Backing::Boxed(Box::<[u8]>::default()),
2877            ConnectionAccept {
2878                connection_settings: ConnectionSettings {
2879                    parity: Parity::Even,
2880                    max_concurrent_requests: 64,
2881                    initial_channel_credit: 0,
2882                },
2883                metadata: vec![],
2884            },
2885        )
2886    }
2887
2888    fn reject_ref() -> SelfRef<ConnectionReject<'static>> {
2889        SelfRef::owning(
2890            Backing::Boxed(Box::<[u8]>::default()),
2891            ConnectionReject { metadata: vec![] },
2892        )
2893    }
2894
2895    #[tokio::test]
2896    async fn duplicate_connection_accept_is_ignored_after_first() {
2897        let mut session = make_session();
2898        let conn_id = ConnectionId(1);
2899        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2900
2901        session.conns.insert(
2902            conn_id,
2903            ConnectionSlot::PendingOutbound(PendingOutboundData {
2904                local_settings: ConnectionSettings {
2905                    parity: Parity::Odd,
2906                    max_concurrent_requests: 64,
2907                    initial_channel_credit: 16,
2908                },
2909                result_tx: Some(result_tx),
2910            }),
2911        );
2912
2913        session.handle_inbound_accept(conn_id, accept_ref());
2914        let handle = result_rx
2915            .await
2916            .expect("pending outbound result should resolve")
2917            .expect("accept should resolve as Ok");
2918        assert_eq!(handle.connection_id(), conn_id);
2919
2920        session.handle_inbound_accept(conn_id, accept_ref());
2921        assert!(
2922            matches!(
2923                session.conns.get(&conn_id),
2924                Some(ConnectionSlot::Active(ConnectionState { id, .. })) if *id == conn_id
2925            ),
2926            "duplicate accept should keep existing active connection state"
2927        );
2928    }
2929
2930    #[tokio::test]
2931    async fn duplicate_connection_reject_is_ignored_after_first() {
2932        let mut session = make_session();
2933        let conn_id = ConnectionId(1);
2934        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2935
2936        session.conns.insert(
2937            conn_id,
2938            ConnectionSlot::PendingOutbound(PendingOutboundData {
2939                local_settings: ConnectionSettings {
2940                    parity: Parity::Odd,
2941                    max_concurrent_requests: 64,
2942                    initial_channel_credit: 16,
2943                },
2944                result_tx: Some(result_tx),
2945            }),
2946        );
2947
2948        session.handle_inbound_reject(conn_id, reject_ref());
2949        let result = result_rx
2950            .await
2951            .expect("pending outbound result should resolve");
2952        assert!(
2953            matches!(result, Err(SessionError::Rejected(_))),
2954            "expected rejection, got: {result:?}"
2955        );
2956
2957        session.handle_inbound_reject(conn_id, reject_ref());
2958        assert!(
2959            !session.conns.contains_key(&conn_id),
2960            "duplicate reject should not recreate connection state"
2961        );
2962    }
2963
2964    // r[verify rpc.flow-control.credit.initial.zero]
2965    #[tokio::test]
2966    async fn inbound_accept_with_zero_initial_credit_rejects_pending_open() {
2967        let mut session = make_session();
2968        let conn_id = ConnectionId(1);
2969        let (result_tx, result_rx) = moire::sync::oneshot::channel("session.test.open_result");
2970
2971        session.conns.insert(
2972            conn_id,
2973            ConnectionSlot::PendingOutbound(PendingOutboundData {
2974                local_settings: ConnectionSettings {
2975                    parity: Parity::Odd,
2976                    max_concurrent_requests: 64,
2977                    initial_channel_credit: 16,
2978                },
2979                result_tx: Some(result_tx),
2980            }),
2981        );
2982
2983        session.handle_inbound_accept(conn_id, zero_credit_accept_ref());
2984        let result = result_rx
2985            .await
2986            .expect("pending outbound result should resolve");
2987        assert!(
2988            matches!(
2989                result,
2990                Err(SessionError::Protocol(ref message))
2991                    if message == "initial_channel_credit must be greater than zero"
2992            ),
2993            "expected zero-credit protocol error, got: {result:?}"
2994        );
2995        assert!(
2996            !session.conns.contains_key(&conn_id),
2997            "zero-credit accept should not create an active connection"
2998        );
2999    }
3000
3001    #[test]
3002    fn out_of_order_accept_or_reject_without_pending_is_ignored() {
3003        let mut session = make_session();
3004        let conn_id = ConnectionId(99);
3005
3006        session.handle_inbound_accept(conn_id, accept_ref());
3007        session.handle_inbound_reject(conn_id, reject_ref());
3008
3009        assert!(
3010            session.conns.is_empty(),
3011            "out-of-order accept/reject should not mutate empty connection table"
3012        );
3013    }
3014
3015    #[tokio::test]
3016    async fn close_request_clears_pending_outbound_open() {
3017        let mut session = make_session();
3018        let (open_result_tx, open_result_rx) = moire::sync::oneshot::channel("session.open.result");
3019        let (close_result_tx, close_result_rx) =
3020            moire::sync::oneshot::channel("session.close.result");
3021
3022        session.conns.insert(
3023            ConnectionId(1),
3024            ConnectionSlot::PendingOutbound(PendingOutboundData {
3025                local_settings: ConnectionSettings {
3026                    parity: Parity::Odd,
3027                    max_concurrent_requests: 64,
3028                    initial_channel_credit: 16,
3029                },
3030                result_tx: Some(open_result_tx),
3031            }),
3032        );
3033
3034        session
3035            .handle_close_request(CloseRequest {
3036                conn_id: ConnectionId(1),
3037                metadata: vec![],
3038                result_tx: close_result_tx,
3039            })
3040            .await;
3041
3042        let close_result = close_result_rx
3043            .await
3044            .expect("close result should be delivered");
3045        assert!(
3046            close_result.is_ok(),
3047            "close should succeed for pending outbound connection"
3048        );
3049
3050        assert!(
3051            open_result_rx.await.is_err(),
3052            "pending open result channel should be closed once the pending slot is removed"
3053        );
3054    }
3055
3056    #[test]
3057    fn resume_rejects_changed_local_root_settings() {
3058        let mut session = make_session();
3059        let local_settings = ConnectionSettings {
3060            parity: Parity::Odd,
3061            max_concurrent_requests: 64,
3062            initial_channel_credit: 16,
3063        };
3064        let peer_settings = ConnectionSettings {
3065            parity: Parity::Even,
3066            max_concurrent_requests: 64,
3067            initial_channel_credit: 16,
3068        };
3069        let _root = session
3070            .establish_from_handshake(resumed_handshake(
3071                local_settings.clone(),
3072                peer_settings.clone(),
3073            ))
3074            .expect("initial handshake should establish session");
3075
3076        let (link_a, _link_b) = crate::memory_link_pair(32);
3077        let conduit = crate::BareConduit::new(link_a);
3078        let (tx, rx) = conduit.split();
3079
3080        let result = session.resume_from_handshake(
3081            Arc::new(tx),
3082            Box::new(rx),
3083            resumed_handshake(
3084                ConnectionSettings {
3085                    parity: Parity::Odd,
3086                    max_concurrent_requests: 65,
3087                    initial_channel_credit: 16,
3088                },
3089                peer_settings,
3090            ),
3091        );
3092
3093        assert!(
3094            matches!(
3095                &result,
3096                Err(SessionError::Protocol(message))
3097                    if message == "local root settings changed across session resume"
3098            ),
3099            "expected local-root-settings mismatch, got: {result:?}"
3100        );
3101    }
3102
3103    #[test]
3104    fn resume_rejects_changed_peer_root_settings() {
3105        let mut session = make_session();
3106        let local_settings = ConnectionSettings {
3107            parity: Parity::Odd,
3108            max_concurrent_requests: 64,
3109            initial_channel_credit: 16,
3110        };
3111        let peer_settings = ConnectionSettings {
3112            parity: Parity::Even,
3113            max_concurrent_requests: 64,
3114            initial_channel_credit: 16,
3115        };
3116        let _root = session
3117            .establish_from_handshake(resumed_handshake(
3118                local_settings.clone(),
3119                peer_settings.clone(),
3120            ))
3121            .expect("initial handshake should establish session");
3122
3123        let (link_a, _link_b) = crate::memory_link_pair(32);
3124        let conduit = crate::BareConduit::new(link_a);
3125        let (tx, rx) = conduit.split();
3126
3127        let result = session.resume_from_handshake(
3128            Arc::new(tx),
3129            Box::new(rx),
3130            resumed_handshake(
3131                local_settings,
3132                ConnectionSettings {
3133                    parity: Parity::Even,
3134                    max_concurrent_requests: 65,
3135                    initial_channel_credit: 16,
3136                },
3137            ),
3138        );
3139
3140        assert!(
3141            matches!(
3142                &result,
3143                Err(SessionError::Protocol(message))
3144                    if message == "peer root settings changed across session resume"
3145            ),
3146            "expected peer-root-settings mismatch, got: {result:?}"
3147        );
3148    }
3149}