Skip to main content

vox_core/session/
builders.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use moire::sync::mpsc;
10use vox_types::{
11    Conduit, ConduitTx, ConnectionSettings, Handler, HandshakeResult, Link, MaybeSend, MaybeSync,
12    MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink,
13};
14
15#[cfg(not(target_arch = "wasm32"))]
16use crate::{Attachment, LinkSource, StableConduit};
17use crate::{
18    BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
19    handshake_as_acceptor, handshake_as_initiator, initiate_transport,
20};
21
22use super::{
23    CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
24    SessionHandle, SessionKeepaliveConfig,
25};
26use crate::{Driver, DriverCaller, DriverReplySink};
27
28/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
29/// on WASM it's `'static` only (no `Send` requirement).
30#[cfg(not(target_arch = "wasm32"))]
31pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
32#[cfg(target_arch = "wasm32")]
33pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
34
35#[cfg(not(target_arch = "wasm32"))]
36type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
37#[cfg(target_arch = "wasm32")]
38type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
39
40#[cfg(not(target_arch = "wasm32"))]
41fn default_spawn_fn() -> SpawnFn {
42    Box::new(|fut| {
43        tokio::spawn(fut);
44    })
45}
46
47#[cfg(target_arch = "wasm32")]
48fn default_spawn_fn() -> SpawnFn {
49    Box::new(|fut| {
50        wasm_bindgen_futures::spawn_local(fut);
51    })
52}
53
54// r[impl rpc.session-setup]
55// r[impl session.role]
56pub fn initiator_conduit<I: IntoConduit>(
57    into_conduit: I,
58    handshake_result: HandshakeResult,
59) -> SessionInitiatorBuilder<'static, I::Conduit> {
60    SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
61}
62
63#[cfg(not(target_arch = "wasm32"))]
64pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
65where
66    S: LinkSource,
67{
68    SessionSourceInitiatorBuilder::new(source, mode)
69}
70
71pub fn acceptor_conduit<I: IntoConduit>(
72    into_conduit: I,
73    handshake_result: HandshakeResult,
74) -> SessionAcceptorBuilder<'static, I::Conduit> {
75    SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
76}
77
78/// Convenience: perform CBOR handshake as initiator on a raw link, then return
79/// a builder with the conduit ready to go.
80pub async fn initiator_on_link<L: Link>(
81    link: L,
82    settings: ConnectionSettings,
83) -> Result<
84    SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
85    SessionError,
86>
87where
88    L::Tx: MaybeSend + MaybeSync + 'static,
89    L::Rx: MaybeSend + 'static,
90{
91    let (tx, mut rx) = link.split();
92    let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None)
93        .await
94        .map_err(session_error_from_handshake)?;
95    let message_plan =
96        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
97    Ok(SessionInitiatorBuilder::new(
98        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
99        handshake_result,
100    ))
101}
102
103/// Convenience: perform CBOR handshake as acceptor on a raw link, then return
104/// a builder with the conduit ready to go.
105pub async fn acceptor_on_link<L: Link>(
106    link: L,
107    settings: ConnectionSettings,
108) -> Result<
109    SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
110    SessionError,
111>
112where
113    L::Tx: MaybeSend + MaybeSync + 'static,
114    L::Rx: MaybeSend + 'static,
115{
116    let (tx, mut rx) = link.split();
117    let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None)
118        .await
119        .map_err(session_error_from_handshake)?;
120    let message_plan =
121        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
122    Ok(SessionAcceptorBuilder::new(
123        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
124        handshake_result,
125    ))
126}
127
128pub fn initiator_on<L: Link>(
129    link: L,
130    mode: TransportMode,
131) -> SessionTransportInitiatorBuilder<'static, L> {
132    SessionTransportInitiatorBuilder::new(link, mode)
133}
134
135pub fn initiator_transport<L: Link>(
136    link: L,
137    mode: TransportMode,
138) -> SessionTransportInitiatorBuilder<'static, L> {
139    initiator_on(link, mode)
140}
141
142pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
143    SessionTransportAcceptorBuilder::new(link)
144}
145
146pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
147    acceptor_on(link)
148}
149
150#[derive(Clone, Default)]
151pub struct SessionRegistry {
152    inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
153}
154
155impl SessionRegistry {
156    fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
157        self.inner
158            .lock()
159            .expect("session registry poisoned")
160            .get(key)
161            .cloned()
162    }
163
164    fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
165        self.inner
166            .lock()
167            .expect("session registry poisoned")
168            .insert(key, handle);
169    }
170
171    fn remove(&self, key: &SessionResumeKey) {
172        self.inner
173            .lock()
174            .expect("session registry poisoned")
175            .remove(key);
176    }
177}
178
179pub enum SessionAcceptOutcome<Client> {
180    Established(Client, SessionHandle),
181    Resumed,
182}
183
184pub struct SessionInitiatorBuilder<'a, C> {
185    conduit: C,
186    handshake_result: HandshakeResult,
187    root_settings: ConnectionSettings,
188    metadata: Metadata<'a>,
189    on_connection: Option<Box<dyn ConnectionAcceptor>>,
190    keepalive: Option<SessionKeepaliveConfig>,
191    resumable: bool,
192    recoverer: Option<Box<dyn ConduitRecoverer>>,
193    operation_store: Option<Arc<dyn OperationStore>>,
194    spawn_fn: SpawnFn,
195}
196
197impl<'a, C> SessionInitiatorBuilder<'a, C> {
198    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
199        let root_settings = handshake_result.our_settings.clone();
200        Self {
201            conduit,
202            handshake_result,
203            root_settings,
204            metadata: vec![],
205            on_connection: None,
206            keepalive: None,
207            resumable: false,
208            recoverer: None,
209            operation_store: None,
210            spawn_fn: default_spawn_fn(),
211        }
212    }
213
214    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
215        self.on_connection = Some(Box::new(acceptor));
216        self
217    }
218
219    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
220        self.keepalive = Some(keepalive);
221        self
222    }
223
224    pub fn resumable(mut self) -> Self {
225        self.resumable = true;
226        self
227    }
228
229    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
230        self.operation_store = Some(operation_store);
231        self
232    }
233
234    /// Override the function used to spawn the session background task.
235    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
236    #[cfg(not(target_arch = "wasm32"))]
237    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
238        self.spawn_fn = Box::new(f);
239        self
240    }
241
242    /// Override the function used to spawn the session background task.
243    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
244    #[cfg(target_arch = "wasm32")]
245    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
246        self.spawn_fn = Box::new(f);
247        self
248    }
249
250    pub async fn establish<Client: From<DriverCaller>>(
251        self,
252        handler: impl Handler<DriverReplySink> + 'static,
253    ) -> Result<(Client, SessionHandle), SessionError>
254    where
255        C: Conduit<Msg = MessageFamily> + 'static,
256        C::Tx: MaybeSend + MaybeSync + 'static,
257        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
258        C::Rx: MaybeSend + 'static,
259    {
260        let Self {
261            conduit,
262            handshake_result,
263            root_settings,
264            metadata: _metadata,
265            on_connection,
266            keepalive,
267            resumable,
268            recoverer,
269            operation_store,
270            spawn_fn,
271        } = self;
272        validate_negotiated_root_settings(&root_settings, &handshake_result)?;
273        let (tx, rx) = conduit.split();
274        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
275        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
276        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
277        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
278        let mut session = Session::pre_handshake(
279            tx,
280            rx,
281            on_connection,
282            open_rx,
283            close_rx,
284            resume_rx,
285            control_tx.clone(),
286            control_rx,
287            keepalive,
288            resumable,
289            recoverer,
290        );
291        let handle = session.establish_from_handshake(handshake_result)?;
292        let resume_key = session.resume_key();
293        let session_handle = SessionHandle {
294            open_tx,
295            close_tx,
296            resume_tx,
297            control_tx,
298            resume_key,
299        };
300        let mut driver = match operation_store {
301            Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
302            None => Driver::new(handle, handler),
303        };
304        let client = Client::from(driver.caller());
305        (spawn_fn)(Box::pin(async move { session.run().await }));
306        #[cfg(not(target_arch = "wasm32"))]
307        tokio::spawn(async move { driver.run().await });
308        #[cfg(target_arch = "wasm32")]
309        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
310        Ok((client, session_handle))
311    }
312}
313
314#[cfg(not(target_arch = "wasm32"))]
315pub struct SessionSourceInitiatorBuilder<'a, S> {
316    source: S,
317    mode: TransportMode,
318    root_settings: ConnectionSettings,
319    metadata: Metadata<'a>,
320    on_connection: Option<Box<dyn ConnectionAcceptor>>,
321    keepalive: Option<SessionKeepaliveConfig>,
322    resumable: bool,
323    operation_store: Option<Arc<dyn OperationStore>>,
324    spawn_fn: SpawnFn,
325}
326
327#[cfg(not(target_arch = "wasm32"))]
328impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
329    fn new(source: S, mode: TransportMode) -> Self {
330        Self {
331            source,
332            mode,
333            root_settings: ConnectionSettings {
334                parity: Parity::Odd,
335                max_concurrent_requests: 64,
336            },
337            metadata: vec![],
338            on_connection: None,
339            keepalive: None,
340            resumable: true,
341            operation_store: None,
342            spawn_fn: default_spawn_fn(),
343        }
344    }
345
346    pub fn parity(mut self, parity: Parity) -> Self {
347        self.root_settings.parity = parity;
348        self
349    }
350
351    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
352        self.root_settings = settings;
353        self
354    }
355
356    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
357        self.root_settings.max_concurrent_requests = max_concurrent_requests;
358        self
359    }
360
361    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
362        self.metadata = metadata;
363        self
364    }
365
366    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
367        self.on_connection = Some(Box::new(acceptor));
368        self
369    }
370
371    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
372        self.keepalive = Some(keepalive);
373        self
374    }
375
376    pub fn resumable(mut self) -> Self {
377        self.resumable = true;
378        self
379    }
380
381    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
382        self.operation_store = Some(operation_store);
383        self
384    }
385
386    #[cfg(not(target_arch = "wasm32"))]
387    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
388        self.spawn_fn = Box::new(f);
389        self
390    }
391
392    #[cfg(target_arch = "wasm32")]
393    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
394        self.spawn_fn = Box::new(f);
395        self
396    }
397
398    pub async fn establish<Client: From<DriverCaller>>(
399        self,
400        handler: impl Handler<DriverReplySink> + 'static,
401    ) -> Result<(Client, SessionHandle), SessionError>
402    where
403        S: LinkSource,
404        S::Link: Link + Send + 'static,
405        <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
406        <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
407        <S::Link as Link>::Rx: MaybeSend + Send + 'static,
408    {
409        let Self {
410            mut source,
411            mode,
412            root_settings,
413            metadata,
414            on_connection,
415            keepalive,
416            resumable,
417            operation_store,
418            spawn_fn,
419        } = self;
420
421        match mode {
422            TransportMode::Bare => {
423                let attachment = source.next_link().await.map_err(SessionError::Io)?;
424                let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
425                    .await
426                    .map_err(session_error_from_transport)?;
427                let handshake_result = handshake_as_initiator(
428                    &link.tx,
429                    &mut link.rx,
430                    root_settings.clone(),
431                    true,
432                    None,
433                )
434                .await
435                .map_err(session_error_from_handshake)?;
436                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
437                    .map_err(SessionError::Protocol)?;
438                let builder = SessionInitiatorBuilder::new(
439                    BareConduit::with_message_plan(link, message_plan),
440                    handshake_result,
441                );
442                let recoverer = Box::new(BareSourceRecoverer {
443                    source,
444                    settings: root_settings.clone(),
445                });
446                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
447                    builder,
448                    root_settings,
449                    metadata,
450                    on_connection,
451                    keepalive,
452                    resumable,
453                    Some(recoverer),
454                    operation_store,
455                    spawn_fn,
456                )
457                .establish(handler)
458                .await
459            }
460            TransportMode::Stable => {
461                // Get first link and do transport + CBOR handshake before
462                // handing to the stable conduit.
463                let attachment = source.next_link().await.map_err(SessionError::Io)?;
464                let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
465                    .await
466                    .map_err(session_error_from_transport)?;
467                let handshake_result = handshake_as_initiator(
468                    &link.tx,
469                    &mut link.rx,
470                    root_settings.clone(),
471                    true,
472                    None,
473                )
474                .await
475                .map_err(session_error_from_handshake)?;
476                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
477                    .map_err(SessionError::Protocol)?;
478                let conduit = StableConduit::<MessageFamily, _>::with_first_link(
479                    link.tx,
480                    link.rx,
481                    None, // initiator side — no ClientHello
482                    TransportedLinkSource {
483                        source,
484                        mode: TransportMode::Stable,
485                    },
486                )
487                .await
488                .map_err(|error| {
489                    SessionError::Protocol(format!("stable conduit setup failed: {error}"))
490                })?
491                .with_message_plan(message_plan);
492                let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
493                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
494                    builder,
495                    root_settings,
496                    metadata,
497                    on_connection,
498                    keepalive,
499                    resumable,
500                    None,
501                    operation_store,
502                    spawn_fn,
503                )
504                .establish(handler)
505                .await
506            }
507        }
508    }
509}
510
511pub struct SessionTransportInitiatorBuilder<'a, L> {
512    link: L,
513    mode: TransportMode,
514    root_settings: ConnectionSettings,
515    metadata: Metadata<'a>,
516    on_connection: Option<Box<dyn ConnectionAcceptor>>,
517    keepalive: Option<SessionKeepaliveConfig>,
518    resumable: bool,
519    operation_store: Option<Arc<dyn OperationStore>>,
520    spawn_fn: SpawnFn,
521}
522
523impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
524    fn new(link: L, mode: TransportMode) -> Self {
525        Self {
526            link,
527            mode,
528            root_settings: ConnectionSettings {
529                parity: Parity::Odd,
530                max_concurrent_requests: 64,
531            },
532            metadata: vec![],
533            on_connection: None,
534            keepalive: None,
535            resumable: false,
536            operation_store: None,
537            spawn_fn: default_spawn_fn(),
538        }
539    }
540
541    pub fn parity(mut self, parity: Parity) -> Self {
542        self.root_settings.parity = parity;
543        self
544    }
545
546    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
547        self.root_settings = settings;
548        self
549    }
550
551    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
552        self.root_settings.max_concurrent_requests = max_concurrent_requests;
553        self
554    }
555
556    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
557        self.metadata = metadata;
558        self
559    }
560
561    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
562        self.on_connection = Some(Box::new(acceptor));
563        self
564    }
565
566    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
567        self.keepalive = Some(keepalive);
568        self
569    }
570
571    pub fn resumable(mut self) -> Self {
572        self.resumable = true;
573        self
574    }
575
576    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
577        self.operation_store = Some(operation_store);
578        self
579    }
580
581    #[cfg(not(target_arch = "wasm32"))]
582    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
583        self.spawn_fn = Box::new(f);
584        self
585    }
586
587    #[cfg(target_arch = "wasm32")]
588    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
589        self.spawn_fn = Box::new(f);
590        self
591    }
592
593    #[cfg(not(target_arch = "wasm32"))]
594    pub async fn establish<Client: From<DriverCaller>>(
595        self,
596        handler: impl Handler<DriverReplySink> + 'static,
597    ) -> Result<(Client, SessionHandle), SessionError>
598    where
599        L: Link + Send + 'static,
600        L::Tx: MaybeSend + MaybeSync + 'static,
601        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
602        L::Rx: MaybeSend + 'static,
603    {
604        let Self {
605            link,
606            mode,
607            root_settings,
608            metadata,
609            on_connection,
610            keepalive,
611            resumable,
612            operation_store,
613            spawn_fn,
614        } = self;
615        match mode {
616            TransportMode::Bare => {
617                let link = initiate_transport(link, TransportMode::Bare)
618                    .await
619                    .map_err(session_error_from_transport)?;
620                Self::finish_with_bare_parts(
621                    link,
622                    root_settings,
623                    metadata,
624                    on_connection,
625                    keepalive,
626                    resumable,
627                    operation_store,
628                    spawn_fn,
629                    handler,
630                )
631                .await
632            }
633            TransportMode::Stable => {
634                let link = initiate_transport(link, TransportMode::Stable)
635                    .await
636                    .map_err(session_error_from_transport)?;
637                Self::finish_with_stable_parts(
638                    link,
639                    root_settings,
640                    metadata,
641                    on_connection,
642                    keepalive,
643                    resumable,
644                    operation_store,
645                    spawn_fn,
646                    handler,
647                )
648                .await
649            }
650        }
651    }
652
653    #[cfg(target_arch = "wasm32")]
654    pub async fn establish<Client: From<DriverCaller>>(
655        self,
656        handler: impl Handler<DriverReplySink> + 'static,
657    ) -> Result<(Client, SessionHandle), SessionError>
658    where
659        L: Link + 'static,
660        L::Tx: MaybeSend + MaybeSync + 'static,
661        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
662        L::Rx: MaybeSend + 'static,
663    {
664        let Self {
665            link,
666            mode,
667            root_settings,
668            metadata,
669            on_connection,
670            keepalive,
671            resumable,
672            operation_store,
673            spawn_fn,
674        } = self;
675        match mode {
676            TransportMode::Bare => {
677                let link = initiate_transport(link, TransportMode::Bare)
678                    .await
679                    .map_err(session_error_from_transport)?;
680                Self::finish_with_bare_parts(
681                    link,
682                    root_settings,
683                    metadata,
684                    on_connection,
685                    keepalive,
686                    resumable,
687                    operation_store,
688                    spawn_fn,
689                    handler,
690                )
691                .await
692            }
693            TransportMode::Stable => Err(SessionError::Protocol(
694                "stable conduit transport selection is unsupported on wasm".into(),
695            )),
696        }
697    }
698
699    #[allow(clippy::too_many_arguments)]
700    async fn finish_with_bare_parts<Client: From<DriverCaller>>(
701        mut link: SplitLink<L::Tx, L::Rx>,
702        root_settings: ConnectionSettings,
703        metadata: Metadata<'a>,
704        on_connection: Option<Box<dyn ConnectionAcceptor>>,
705        keepalive: Option<SessionKeepaliveConfig>,
706        resumable: bool,
707        operation_store: Option<Arc<dyn OperationStore>>,
708        spawn_fn: SpawnFn,
709        handler: impl Handler<DriverReplySink> + 'static,
710    ) -> Result<(Client, SessionHandle), SessionError>
711    where
712        L: Link + 'static,
713        L::Tx: MaybeSend + MaybeSync + 'static,
714        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
715        L::Rx: MaybeSend + 'static,
716    {
717        let handshake_result =
718            handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
719                .await
720                .map_err(session_error_from_handshake)?;
721        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
722            .map_err(SessionError::Protocol)?;
723        let builder = SessionInitiatorBuilder::new(
724            BareConduit::with_message_plan(link, message_plan),
725            handshake_result,
726        );
727        Self::apply_common_parts(
728            builder,
729            root_settings,
730            metadata,
731            on_connection,
732            keepalive,
733            resumable,
734            None,
735            operation_store,
736            spawn_fn,
737        )
738        .establish(handler)
739        .await
740    }
741
742    #[cfg(not(target_arch = "wasm32"))]
743    #[allow(clippy::too_many_arguments)]
744    async fn finish_with_stable_parts<Client: From<DriverCaller>>(
745        mut link: SplitLink<L::Tx, L::Rx>,
746        root_settings: ConnectionSettings,
747        metadata: Metadata<'a>,
748        on_connection: Option<Box<dyn ConnectionAcceptor>>,
749        keepalive: Option<SessionKeepaliveConfig>,
750        resumable: bool,
751        operation_store: Option<Arc<dyn OperationStore>>,
752        spawn_fn: SpawnFn,
753        handler: impl Handler<DriverReplySink> + 'static,
754    ) -> Result<(Client, SessionHandle), SessionError>
755    where
756        L: Link + Send + 'static,
757        L::Tx: MaybeSend + MaybeSync + Send + 'static,
758        for<'p> <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
759        L::Rx: MaybeSend + Send + 'static,
760    {
761        let handshake_result =
762            handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
763                .await
764                .map_err(session_error_from_handshake)?;
765        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
766            .map_err(SessionError::Protocol)?;
767        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
768            link.tx,
769            link.rx,
770            None,
771            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
772        )
773        .await
774        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
775        .with_message_plan(message_plan);
776        let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
777        Self::apply_common_parts(
778            builder,
779            root_settings,
780            metadata,
781            on_connection,
782            keepalive,
783            resumable,
784            None,
785            operation_store,
786            spawn_fn,
787        )
788        .establish(handler)
789        .await
790    }
791
792    #[allow(clippy::too_many_arguments)]
793    #[allow(clippy::too_many_arguments)]
794    fn apply_common_parts<C>(
795        mut builder: SessionInitiatorBuilder<'a, C>,
796        root_settings: ConnectionSettings,
797        metadata: Metadata<'a>,
798        on_connection: Option<Box<dyn ConnectionAcceptor>>,
799        keepalive: Option<SessionKeepaliveConfig>,
800        resumable: bool,
801        recoverer: Option<Box<dyn ConduitRecoverer>>,
802        operation_store: Option<Arc<dyn OperationStore>>,
803        spawn_fn: SpawnFn,
804    ) -> SessionInitiatorBuilder<'a, C> {
805        builder.root_settings = root_settings;
806        builder.metadata = metadata;
807        builder.on_connection = on_connection;
808        builder.keepalive = keepalive;
809        builder.resumable = resumable;
810        builder.recoverer = recoverer;
811        builder.operation_store = operation_store;
812        builder.spawn_fn = spawn_fn;
813        builder
814    }
815}
816
817#[cfg(not(target_arch = "wasm32"))]
818struct BareSourceRecoverer<S> {
819    source: S,
820    settings: ConnectionSettings,
821}
822
823#[cfg(not(target_arch = "wasm32"))]
824const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
825#[cfg(not(target_arch = "wasm32"))]
826const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
827
828#[cfg(not(target_arch = "wasm32"))]
829impl<S> ConduitRecoverer for BareSourceRecoverer<S>
830where
831    S: LinkSource,
832    S::Link: Link + Send + 'static,
833    <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
834    <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
835    <S::Link as Link>::Rx: MaybeSend + Send + 'static,
836{
837    fn next_conduit<'a>(
838        &'a mut self,
839        resume_key: Option<&'a SessionResumeKey>,
840    ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
841        Box::pin(async move {
842            let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
843            let mut use_resume_key = resume_key.is_some();
844
845            loop {
846                let selected_resume_key = if use_resume_key { resume_key } else { None };
847
848                let result: Result<super::RecoveredConduit, SessionError> = async {
849                    let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
850                    let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
851                        .await
852                        .map_err(session_error_from_transport)?;
853                    let handshake_result = handshake_as_initiator(
854                        &link.tx,
855                        &mut link.rx,
856                        self.settings.clone(),
857                        true,
858                        selected_resume_key,
859                    )
860                    .await
861                    .map_err(session_error_from_handshake)?;
862                    let conduit = BareConduit::<MessageFamily, _>::new(link);
863                    let (tx, rx) = conduit.split();
864                    Ok(super::RecoveredConduit {
865                        tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
866                        rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
867                        handshake: handshake_result,
868                    })
869                }
870                .await;
871
872                if let Ok(conduit) = result {
873                    return Ok(conduit);
874                }
875
876                if use_resume_key {
877                    // If a resumption attempt is rejected once, continue trying without
878                    // a resume key so restart scenarios can establish a fresh session.
879                    use_resume_key = false;
880                }
881
882                tokio::time::sleep(backoff).await;
883                backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
884            }
885        })
886    }
887}
888
889#[cfg(not(target_arch = "wasm32"))]
890struct TransportedLinkSource<S> {
891    source: S,
892    mode: TransportMode,
893}
894
895#[cfg(not(target_arch = "wasm32"))]
896impl<S> LinkSource for TransportedLinkSource<S>
897where
898    S: LinkSource,
899    S::Link: Link + Send + 'static,
900    <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
901    <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
902    <S::Link as Link>::Rx: MaybeSend + Send + 'static,
903{
904    type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
905
906    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
907        let attachment = self.source.next_link().await?;
908        let link = initiate_transport(attachment.into_link(), self.mode)
909            .await
910            .map_err(std::io::Error::other)?;
911        Ok(Attachment::initiator(link))
912    }
913}
914
915pub struct SessionAcceptorBuilder<'a, C> {
916    conduit: C,
917    handshake_result: HandshakeResult,
918    root_settings: ConnectionSettings,
919    metadata: Metadata<'a>,
920    on_connection: Option<Box<dyn ConnectionAcceptor>>,
921    keepalive: Option<SessionKeepaliveConfig>,
922    resumable: bool,
923    session_registry: Option<SessionRegistry>,
924    operation_store: Option<Arc<dyn OperationStore>>,
925    spawn_fn: SpawnFn,
926}
927
928impl<'a, C> SessionAcceptorBuilder<'a, C> {
929    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
930        let root_settings = handshake_result.our_settings.clone();
931        Self {
932            conduit,
933            handshake_result,
934            root_settings,
935            metadata: vec![],
936            on_connection: None,
937            keepalive: None,
938            resumable: false,
939            session_registry: None,
940            operation_store: None,
941            spawn_fn: default_spawn_fn(),
942        }
943    }
944
945    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
946        self.on_connection = Some(Box::new(acceptor));
947        self
948    }
949
950    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
951        self.keepalive = Some(keepalive);
952        self
953    }
954
955    pub fn resumable(mut self) -> Self {
956        self.resumable = true;
957        self
958    }
959
960    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
961        self.session_registry = Some(session_registry);
962        self
963    }
964
965    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
966        self.operation_store = Some(operation_store);
967        self
968    }
969
970    /// Override the function used to spawn the session background task.
971    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
972    #[cfg(not(target_arch = "wasm32"))]
973    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
974        self.spawn_fn = Box::new(f);
975        self
976    }
977
978    /// Override the function used to spawn the session background task.
979    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
980    #[cfg(target_arch = "wasm32")]
981    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
982        self.spawn_fn = Box::new(f);
983        self
984    }
985
986    #[moire::instrument]
987    pub async fn establish<Client: From<DriverCaller>>(
988        self,
989        handler: impl Handler<DriverReplySink> + 'static,
990    ) -> Result<(Client, SessionHandle), SessionError>
991    where
992        C: Conduit<Msg = MessageFamily> + 'static,
993        C::Tx: MaybeSend + MaybeSync + 'static,
994        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
995        C::Rx: MaybeSend + 'static,
996    {
997        let Self {
998            conduit,
999            handshake_result,
1000            root_settings,
1001            metadata: _metadata,
1002            on_connection,
1003            keepalive,
1004            resumable,
1005            session_registry,
1006            operation_store,
1007            spawn_fn,
1008        } = self;
1009        validate_negotiated_root_settings(&root_settings, &handshake_result)?;
1010        let (tx, rx) = conduit.split();
1011        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
1012        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
1013        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
1014        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
1015        let mut session = Session::pre_handshake(
1016            tx,
1017            rx,
1018            on_connection,
1019            open_rx,
1020            close_rx,
1021            resume_rx,
1022            control_tx.clone(),
1023            control_rx,
1024            keepalive,
1025            resumable,
1026            None,
1027        );
1028        let handle = session.establish_from_handshake(handshake_result)?;
1029        let resume_key = session.resume_key();
1030        let session_handle = SessionHandle {
1031            open_tx,
1032            close_tx,
1033            resume_tx,
1034            control_tx,
1035            resume_key,
1036        };
1037        if let (Some(registry), Some(key)) = (&session_registry, resume_key) {
1038            registry.insert(key, session_handle.clone());
1039        }
1040        let mut driver = match operation_store {
1041            Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
1042            None => Driver::new(handle, handler),
1043        };
1044        let client = Client::from(driver.caller());
1045        (spawn_fn)(Box::pin(async move { session.run().await }));
1046        #[cfg(not(target_arch = "wasm32"))]
1047        tokio::spawn(async move { driver.run().await });
1048        #[cfg(target_arch = "wasm32")]
1049        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
1050        Ok((client, session_handle))
1051    }
1052
1053    #[moire::instrument]
1054    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1055        self,
1056        handler: impl Handler<DriverReplySink> + 'static,
1057    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1058    where
1059        C: Conduit<Msg = MessageFamily> + 'static,
1060        C::Tx: MaybeSend + MaybeSync + 'static,
1061        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
1062        C::Rx: MaybeSend + 'static,
1063    {
1064        // With the CBOR handshake, resume detection happens at the link level
1065        // before the conduit is created. If the peer sent a resume key in the Hello
1066        // that matches a known session, we resume. Otherwise, we establish.
1067        if let (Some(registry), Some(resume_key)) = (
1068            &self.session_registry,
1069            self.handshake_result.peer_resume_key,
1070        ) {
1071            if let Some(handle) = registry.get(&resume_key) {
1072                let (tx, rx) = self.conduit.split();
1073                if let Err(error) = handle
1074                    .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1075                    .await
1076                {
1077                    registry.remove(&resume_key);
1078                    return Err(error);
1079                }
1080                return Ok(SessionAcceptOutcome::Resumed);
1081            }
1082        }
1083
1084        let (client, session_handle) = self.establish(handler).await?;
1085        Ok(SessionAcceptOutcome::Established(client, session_handle))
1086    }
1087}
1088
1089pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1090    link: L,
1091    root_settings: ConnectionSettings,
1092    metadata: Metadata<'a>,
1093    on_connection: Option<Box<dyn ConnectionAcceptor>>,
1094    keepalive: Option<SessionKeepaliveConfig>,
1095    resumable: bool,
1096    session_registry: Option<SessionRegistry>,
1097    operation_store: Option<Arc<dyn OperationStore>>,
1098    spawn_fn: SpawnFn,
1099}
1100
1101impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1102    fn new(link: L) -> Self {
1103        Self {
1104            link,
1105            root_settings: ConnectionSettings {
1106                parity: Parity::Even,
1107                max_concurrent_requests: 64,
1108            },
1109            metadata: vec![],
1110            on_connection: None,
1111            keepalive: None,
1112            resumable: true,
1113            session_registry: None,
1114            operation_store: None,
1115            spawn_fn: default_spawn_fn(),
1116        }
1117    }
1118
1119    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1120        self.root_settings = settings;
1121        self
1122    }
1123
1124    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1125        self.root_settings.max_concurrent_requests = max_concurrent_requests;
1126        self
1127    }
1128
1129    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1130        self.metadata = metadata;
1131        self
1132    }
1133
1134    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1135        self.on_connection = Some(Box::new(acceptor));
1136        self
1137    }
1138
1139    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1140        self.keepalive = Some(keepalive);
1141        self
1142    }
1143
1144    pub fn resumable(mut self) -> Self {
1145        self.resumable = true;
1146        self
1147    }
1148
1149    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1150        self.session_registry = Some(session_registry);
1151        self
1152    }
1153
1154    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1155        self.operation_store = Some(operation_store);
1156        self
1157    }
1158
1159    #[cfg(not(target_arch = "wasm32"))]
1160    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1161        self.spawn_fn = Box::new(f);
1162        self
1163    }
1164
1165    #[cfg(target_arch = "wasm32")]
1166    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1167        self.spawn_fn = Box::new(f);
1168        self
1169    }
1170
1171    #[moire::instrument]
1172    #[cfg(not(target_arch = "wasm32"))]
1173    pub async fn establish<Client: From<DriverCaller>>(
1174        self,
1175        handler: impl Handler<DriverReplySink> + 'static,
1176    ) -> Result<(Client, SessionHandle), SessionError>
1177    where
1178        L: Link + Send + 'static,
1179        L::Tx: MaybeSend + MaybeSync + 'static,
1180        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1181        L::Rx: MaybeSend + 'static,
1182    {
1183        let Self {
1184            link,
1185            root_settings,
1186            metadata,
1187            on_connection,
1188            keepalive,
1189            resumable,
1190            session_registry,
1191            operation_store,
1192            spawn_fn,
1193        } = self;
1194        let (mode, mut link) = accept_transport(link)
1195            .await
1196            .map_err(session_error_from_transport)?;
1197        match mode {
1198            TransportMode::Bare => {
1199                let handshake_result = handshake_as_acceptor(
1200                    &link.tx,
1201                    &mut link.rx,
1202                    root_settings.clone(),
1203                    true,
1204                    resumable,
1205                    None,
1206                )
1207                .await
1208                .map_err(session_error_from_handshake)?;
1209                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1210                    .map_err(SessionError::Protocol)?;
1211                let builder = SessionAcceptorBuilder::new(
1212                    BareConduit::with_message_plan(link, message_plan),
1213                    handshake_result,
1214                );
1215                Self::apply_common_parts(
1216                    builder,
1217                    root_settings,
1218                    metadata,
1219                    on_connection,
1220                    keepalive,
1221                    resumable,
1222                    session_registry,
1223                    operation_store,
1224                    spawn_fn,
1225                )
1226                .establish(handler)
1227                .await
1228            }
1229            TransportMode::Stable => {
1230                Self::finish_with_stable_parts(
1231                    link,
1232                    root_settings,
1233                    metadata,
1234                    on_connection,
1235                    keepalive,
1236                    resumable,
1237                    session_registry,
1238                    operation_store,
1239                    spawn_fn,
1240                    handler,
1241                )
1242                .await
1243            }
1244        }
1245    }
1246
1247    #[moire::instrument]
1248    #[cfg(not(target_arch = "wasm32"))]
1249    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1250        self,
1251        handler: impl Handler<DriverReplySink> + 'static,
1252    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1253    where
1254        L: Link + Send + 'static,
1255        L::Tx: MaybeSend + MaybeSync + 'static,
1256        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1257        L::Rx: MaybeSend + 'static,
1258    {
1259        let Self {
1260            link,
1261            root_settings,
1262            metadata,
1263            on_connection,
1264            keepalive,
1265            resumable,
1266            session_registry,
1267            operation_store,
1268            spawn_fn,
1269        } = self;
1270        let (mode, mut link) = accept_transport(link)
1271            .await
1272            .map_err(session_error_from_transport)?;
1273        match mode {
1274            TransportMode::Bare => {
1275                let handshake_result = handshake_as_acceptor(
1276                    &link.tx,
1277                    &mut link.rx,
1278                    root_settings.clone(),
1279                    true,
1280                    resumable,
1281                    None,
1282                )
1283                .await
1284                .map_err(session_error_from_handshake)?;
1285                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1286                    .map_err(SessionError::Protocol)?;
1287                let builder = SessionAcceptorBuilder::new(
1288                    BareConduit::with_message_plan(link, message_plan),
1289                    handshake_result,
1290                );
1291                Self::apply_common_parts(
1292                    builder,
1293                    root_settings,
1294                    metadata,
1295                    on_connection,
1296                    keepalive,
1297                    resumable,
1298                    session_registry,
1299                    operation_store,
1300                    spawn_fn,
1301                )
1302                .establish_or_resume(handler)
1303                .await
1304            }
1305            TransportMode::Stable => Self::finish_with_stable_parts(
1306                link,
1307                root_settings,
1308                metadata,
1309                on_connection,
1310                keepalive,
1311                resumable,
1312                session_registry,
1313                operation_store,
1314                spawn_fn,
1315                handler,
1316            )
1317            .await
1318            .map(|(client, handle)| SessionAcceptOutcome::Established(client, handle)),
1319        }
1320    }
1321
1322    #[cfg(target_arch = "wasm32")]
1323    pub async fn establish<Client: From<DriverCaller>>(
1324        self,
1325        handler: impl Handler<DriverReplySink> + 'static,
1326    ) -> Result<(Client, SessionHandle), SessionError>
1327    where
1328        L: Link + 'static,
1329        L::Tx: MaybeSend + MaybeSync + 'static,
1330        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1331        L::Rx: MaybeSend + 'static,
1332    {
1333        let Self {
1334            link,
1335            root_settings,
1336            metadata,
1337            on_connection,
1338            keepalive,
1339            resumable,
1340            session_registry,
1341            operation_store,
1342            spawn_fn,
1343        } = self;
1344        let (mode, mut link) = accept_transport(link)
1345            .await
1346            .map_err(session_error_from_transport)?;
1347        match mode {
1348            TransportMode::Bare => {
1349                let handshake_result = handshake_as_acceptor(
1350                    &link.tx,
1351                    &mut link.rx,
1352                    root_settings.clone(),
1353                    true,
1354                    resumable,
1355                    None,
1356                )
1357                .await
1358                .map_err(session_error_from_handshake)?;
1359                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1360                    .map_err(SessionError::Protocol)?;
1361                let builder = SessionAcceptorBuilder::new(
1362                    BareConduit::with_message_plan(link, message_plan),
1363                    handshake_result,
1364                );
1365                Self::apply_common_parts(
1366                    builder,
1367                    root_settings,
1368                    metadata,
1369                    on_connection,
1370                    keepalive,
1371                    resumable,
1372                    session_registry,
1373                    operation_store,
1374                    spawn_fn,
1375                )
1376                .establish(handler)
1377                .await
1378            }
1379            TransportMode::Stable => Err(SessionError::Protocol(
1380                "stable conduit transport selection is unsupported on wasm".into(),
1381            )),
1382        }
1383    }
1384
1385    #[cfg(target_arch = "wasm32")]
1386    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1387        self,
1388        handler: impl Handler<DriverReplySink> + 'static,
1389    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1390    where
1391        L: Link + 'static,
1392        L::Tx: MaybeSend + MaybeSync + 'static,
1393        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1394        L::Rx: MaybeSend + 'static,
1395    {
1396        let Self {
1397            link,
1398            root_settings,
1399            metadata,
1400            on_connection,
1401            keepalive,
1402            resumable,
1403            session_registry,
1404            operation_store,
1405            spawn_fn,
1406        } = self;
1407        let (mode, mut link) = accept_transport(link)
1408            .await
1409            .map_err(session_error_from_transport)?;
1410        match mode {
1411            TransportMode::Bare => {
1412                let handshake_result = handshake_as_acceptor(
1413                    &link.tx,
1414                    &mut link.rx,
1415                    root_settings.clone(),
1416                    true,
1417                    resumable,
1418                    None,
1419                )
1420                .await
1421                .map_err(session_error_from_handshake)?;
1422                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1423                    .map_err(SessionError::Protocol)?;
1424                let builder = SessionAcceptorBuilder::new(
1425                    BareConduit::with_message_plan(link, message_plan),
1426                    handshake_result,
1427                );
1428                Self::apply_common_parts(
1429                    builder,
1430                    root_settings,
1431                    metadata,
1432                    on_connection,
1433                    keepalive,
1434                    resumable,
1435                    session_registry,
1436                    operation_store,
1437                    spawn_fn,
1438                )
1439                .establish_or_resume(handler)
1440                .await
1441            }
1442            TransportMode::Stable => Err(SessionError::Protocol(
1443                "stable conduit transport selection is unsupported on wasm".into(),
1444            )),
1445        }
1446    }
1447
1448    #[cfg(not(target_arch = "wasm32"))]
1449    #[allow(clippy::too_many_arguments)]
1450    async fn finish_with_stable_parts<Client: From<DriverCaller>>(
1451        mut link: SplitLink<L::Tx, L::Rx>,
1452        root_settings: ConnectionSettings,
1453        metadata: Metadata<'a>,
1454        on_connection: Option<Box<dyn ConnectionAcceptor>>,
1455        keepalive: Option<SessionKeepaliveConfig>,
1456        resumable: bool,
1457        session_registry: Option<SessionRegistry>,
1458        operation_store: Option<Arc<dyn OperationStore>>,
1459        spawn_fn: SpawnFn,
1460        handler: impl Handler<DriverReplySink> + 'static,
1461    ) -> Result<(Client, SessionHandle), SessionError>
1462    where
1463        L: Link + Send + 'static,
1464        L::Tx: MaybeSend + MaybeSync + Send + 'static,
1465        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1466        L::Rx: MaybeSend + Send + 'static,
1467    {
1468        let handshake_result = handshake_as_acceptor(
1469            &link.tx,
1470            &mut link.rx,
1471            root_settings.clone(),
1472            true,
1473            resumable,
1474            None,
1475        )
1476        .await
1477        .map_err(session_error_from_handshake)?;
1478        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1479            .map_err(SessionError::Protocol)?;
1480        // Read the stable conduit's ClientHello — the initiator sends it
1481        // after the CBOR session handshake completes.
1482        let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1483            .await
1484            .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1485        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1486            link.tx,
1487            link.rx,
1488            Some(client_hello),
1489            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1490        )
1491        .await
1492        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1493        .with_message_plan(message_plan);
1494        let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1495        Self::apply_common_parts(
1496            builder,
1497            root_settings,
1498            metadata,
1499            on_connection,
1500            keepalive,
1501            resumable,
1502            session_registry,
1503            operation_store,
1504            spawn_fn,
1505        )
1506        .establish(handler)
1507        .await
1508    }
1509
1510    #[allow(clippy::too_many_arguments)]
1511    #[allow(clippy::too_many_arguments)]
1512    fn apply_common_parts<C>(
1513        mut builder: SessionAcceptorBuilder<'a, C>,
1514        root_settings: ConnectionSettings,
1515        metadata: Metadata<'a>,
1516        on_connection: Option<Box<dyn ConnectionAcceptor>>,
1517        keepalive: Option<SessionKeepaliveConfig>,
1518        resumable: bool,
1519        session_registry: Option<SessionRegistry>,
1520        operation_store: Option<Arc<dyn OperationStore>>,
1521        spawn_fn: SpawnFn,
1522    ) -> SessionAcceptorBuilder<'a, C> {
1523        builder.root_settings = root_settings;
1524        builder.metadata = metadata;
1525        builder.on_connection = on_connection;
1526        builder.keepalive = keepalive;
1527        builder.resumable = resumable;
1528        builder.session_registry = session_registry;
1529        builder.operation_store = operation_store;
1530        builder.spawn_fn = spawn_fn;
1531        builder
1532    }
1533}
1534
1535fn validate_negotiated_root_settings(
1536    expected_root_settings: &ConnectionSettings,
1537    handshake_result: &HandshakeResult,
1538) -> Result<(), SessionError> {
1539    if handshake_result.our_settings != *expected_root_settings {
1540        return Err(SessionError::Protocol(
1541            "negotiated root settings do not match builder settings".into(),
1542        ));
1543    }
1544    Ok(())
1545}
1546
1547fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1548    match error {
1549        crate::HandshakeError::Io(io) => SessionError::Io(io),
1550        crate::HandshakeError::PeerClosed => {
1551            SessionError::Protocol("peer closed during handshake".into())
1552        }
1553        crate::HandshakeError::NotResumable => SessionError::NotResumable,
1554        other => SessionError::Protocol(other.to_string()),
1555    }
1556}
1557
1558fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1559    match error {
1560        crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1561        crate::TransportPrologueError::LinkDead => {
1562            SessionError::Protocol("link closed during transport prologue".into())
1563        }
1564        crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1565        crate::TransportPrologueError::Rejected(reason) => {
1566            SessionError::Protocol(format!("transport rejected: {reason}"))
1567        }
1568    }
1569}