Skip to main content

vox_core/session/
builders.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6};
7
8use moire::sync::mpsc;
9use vox_types::{
10    Conduit, ConduitTx, ConnectionSettings, Handler, HandshakeResult, Link, MaybeSend, MaybeSync,
11    MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink,
12};
13
14#[cfg(not(target_arch = "wasm32"))]
15use crate::{Attachment, LinkSource, StableConduit};
16use crate::{
17    BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
18    handshake_as_acceptor, handshake_as_initiator, initiate_transport,
19};
20
21use super::{
22    CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
23    SessionHandle, SessionKeepaliveConfig,
24};
25use crate::{Driver, DriverCaller, DriverReplySink};
26
27/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
28/// on WASM it's `'static` only (no `Send` requirement).
29#[cfg(not(target_arch = "wasm32"))]
30pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
31#[cfg(target_arch = "wasm32")]
32pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
33
34#[cfg(not(target_arch = "wasm32"))]
35type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
36#[cfg(target_arch = "wasm32")]
37type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
38
39#[cfg(not(target_arch = "wasm32"))]
40fn default_spawn_fn() -> SpawnFn {
41    Box::new(|fut| {
42        tokio::spawn(fut);
43    })
44}
45
46#[cfg(target_arch = "wasm32")]
47fn default_spawn_fn() -> SpawnFn {
48    Box::new(|fut| {
49        wasm_bindgen_futures::spawn_local(fut);
50    })
51}
52
53// r[impl rpc.session-setup]
54// r[impl session.role]
55pub fn initiator_conduit<I: IntoConduit>(
56    into_conduit: I,
57    handshake_result: HandshakeResult,
58) -> SessionInitiatorBuilder<'static, I::Conduit> {
59    SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
60}
61
62#[cfg(not(target_arch = "wasm32"))]
63pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
64where
65    S: LinkSource,
66{
67    SessionSourceInitiatorBuilder::new(source, mode)
68}
69
70pub fn acceptor_conduit<I: IntoConduit>(
71    into_conduit: I,
72    handshake_result: HandshakeResult,
73) -> SessionAcceptorBuilder<'static, I::Conduit> {
74    SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
75}
76
77/// Convenience: perform CBOR handshake as initiator on a raw link, then return
78/// a builder with the conduit ready to go.
79pub async fn initiator_on_link<L: Link>(
80    link: L,
81    settings: ConnectionSettings,
82) -> Result<
83    SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
84    SessionError,
85>
86where
87    L::Tx: MaybeSend + MaybeSync + 'static,
88    L::Rx: MaybeSend + 'static,
89{
90    let (tx, mut rx) = link.split();
91    let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None)
92        .await
93        .map_err(session_error_from_handshake)?;
94    let message_plan =
95        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
96    Ok(SessionInitiatorBuilder::new(
97        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
98        handshake_result,
99    ))
100}
101
102/// Convenience: perform CBOR handshake as acceptor on a raw link, then return
103/// a builder with the conduit ready to go.
104pub async fn acceptor_on_link<L: Link>(
105    link: L,
106    settings: ConnectionSettings,
107) -> Result<
108    SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
109    SessionError,
110>
111where
112    L::Tx: MaybeSend + MaybeSync + 'static,
113    L::Rx: MaybeSend + 'static,
114{
115    let (tx, mut rx) = link.split();
116    let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None)
117        .await
118        .map_err(session_error_from_handshake)?;
119    let message_plan =
120        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
121    Ok(SessionAcceptorBuilder::new(
122        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
123        handshake_result,
124    ))
125}
126
127pub fn initiator_on<L: Link>(
128    link: L,
129    mode: TransportMode,
130) -> SessionTransportInitiatorBuilder<'static, L> {
131    SessionTransportInitiatorBuilder::new(link, mode)
132}
133
134pub fn initiator_transport<L: Link>(
135    link: L,
136    mode: TransportMode,
137) -> SessionTransportInitiatorBuilder<'static, L> {
138    initiator_on(link, mode)
139}
140
141pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
142    SessionTransportAcceptorBuilder::new(link)
143}
144
145pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
146    acceptor_on(link)
147}
148
149#[derive(Clone, Default)]
150pub struct SessionRegistry {
151    inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
152}
153
154impl SessionRegistry {
155    fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
156        self.inner
157            .lock()
158            .expect("session registry poisoned")
159            .get(key)
160            .cloned()
161    }
162
163    fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
164        self.inner
165            .lock()
166            .expect("session registry poisoned")
167            .insert(key, handle);
168    }
169
170    fn remove(&self, key: &SessionResumeKey) {
171        self.inner
172            .lock()
173            .expect("session registry poisoned")
174            .remove(key);
175    }
176}
177
178pub enum SessionAcceptOutcome<Client> {
179    Established(Client, SessionHandle),
180    Resumed,
181}
182
183pub struct SessionInitiatorBuilder<'a, C> {
184    conduit: C,
185    handshake_result: HandshakeResult,
186    root_settings: ConnectionSettings,
187    metadata: Metadata<'a>,
188    on_connection: Option<Box<dyn ConnectionAcceptor>>,
189    keepalive: Option<SessionKeepaliveConfig>,
190    resumable: bool,
191    recoverer: Option<Box<dyn ConduitRecoverer>>,
192    operation_store: Option<Arc<dyn OperationStore>>,
193    spawn_fn: SpawnFn,
194}
195
196impl<'a, C> SessionInitiatorBuilder<'a, C> {
197    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
198        let root_settings = handshake_result.our_settings.clone();
199        Self {
200            conduit,
201            handshake_result,
202            root_settings,
203            metadata: vec![],
204            on_connection: None,
205            keepalive: None,
206            resumable: false,
207            recoverer: None,
208            operation_store: None,
209            spawn_fn: default_spawn_fn(),
210        }
211    }
212
213    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
214        self.on_connection = Some(Box::new(acceptor));
215        self
216    }
217
218    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
219        self.keepalive = Some(keepalive);
220        self
221    }
222
223    pub fn resumable(mut self) -> Self {
224        self.resumable = true;
225        self
226    }
227
228    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
229        self.operation_store = Some(operation_store);
230        self
231    }
232
233    /// Override the function used to spawn the session background task.
234    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
235    #[cfg(not(target_arch = "wasm32"))]
236    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
237        self.spawn_fn = Box::new(f);
238        self
239    }
240
241    /// Override the function used to spawn the session background task.
242    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
243    #[cfg(target_arch = "wasm32")]
244    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
245        self.spawn_fn = Box::new(f);
246        self
247    }
248
249    pub async fn establish<Client: From<DriverCaller>>(
250        self,
251        handler: impl Handler<DriverReplySink> + 'static,
252    ) -> Result<(Client, SessionHandle), SessionError>
253    where
254        C: Conduit<Msg = MessageFamily> + 'static,
255        C::Tx: MaybeSend + MaybeSync + 'static,
256        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
257        C::Rx: MaybeSend + 'static,
258    {
259        let Self {
260            conduit,
261            handshake_result,
262            root_settings,
263            metadata: _metadata,
264            on_connection,
265            keepalive,
266            resumable,
267            recoverer,
268            operation_store,
269            spawn_fn,
270        } = self;
271        validate_negotiated_root_settings(&root_settings, &handshake_result)?;
272        let (tx, rx) = conduit.split();
273        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
274        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
275        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
276        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
277        let mut session = Session::pre_handshake(
278            tx,
279            rx,
280            on_connection,
281            open_rx,
282            close_rx,
283            resume_rx,
284            control_tx.clone(),
285            control_rx,
286            keepalive,
287            resumable,
288            recoverer,
289        );
290        let handle = session.establish_from_handshake(handshake_result)?;
291        let resume_key = session.resume_key();
292        let session_handle = SessionHandle {
293            open_tx,
294            close_tx,
295            resume_tx,
296            control_tx,
297            resume_key,
298        };
299        let mut driver = match operation_store {
300            Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
301            None => Driver::new(handle, handler),
302        };
303        let client = Client::from(driver.caller());
304        (spawn_fn)(Box::pin(async move { session.run().await }));
305        #[cfg(not(target_arch = "wasm32"))]
306        tokio::spawn(async move { driver.run().await });
307        #[cfg(target_arch = "wasm32")]
308        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
309        Ok((client, session_handle))
310    }
311}
312
313#[cfg(not(target_arch = "wasm32"))]
314pub struct SessionSourceInitiatorBuilder<'a, S> {
315    source: S,
316    mode: TransportMode,
317    root_settings: ConnectionSettings,
318    metadata: Metadata<'a>,
319    on_connection: Option<Box<dyn ConnectionAcceptor>>,
320    keepalive: Option<SessionKeepaliveConfig>,
321    resumable: bool,
322    operation_store: Option<Arc<dyn OperationStore>>,
323    spawn_fn: SpawnFn,
324}
325
326#[cfg(not(target_arch = "wasm32"))]
327impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
328    fn new(source: S, mode: TransportMode) -> Self {
329        Self {
330            source,
331            mode,
332            root_settings: ConnectionSettings {
333                parity: Parity::Odd,
334                max_concurrent_requests: 64,
335            },
336            metadata: vec![],
337            on_connection: None,
338            keepalive: None,
339            resumable: true,
340            operation_store: None,
341            spawn_fn: default_spawn_fn(),
342        }
343    }
344
345    pub fn parity(mut self, parity: Parity) -> Self {
346        self.root_settings.parity = parity;
347        self
348    }
349
350    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
351        self.root_settings = settings;
352        self
353    }
354
355    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
356        self.root_settings.max_concurrent_requests = max_concurrent_requests;
357        self
358    }
359
360    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
361        self.metadata = metadata;
362        self
363    }
364
365    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
366        self.on_connection = Some(Box::new(acceptor));
367        self
368    }
369
370    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
371        self.keepalive = Some(keepalive);
372        self
373    }
374
375    pub fn resumable(mut self) -> Self {
376        self.resumable = true;
377        self
378    }
379
380    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
381        self.operation_store = Some(operation_store);
382        self
383    }
384
385    #[cfg(not(target_arch = "wasm32"))]
386    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
387        self.spawn_fn = Box::new(f);
388        self
389    }
390
391    #[cfg(target_arch = "wasm32")]
392    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
393        self.spawn_fn = Box::new(f);
394        self
395    }
396
397    pub async fn establish<Client: From<DriverCaller>>(
398        self,
399        handler: impl Handler<DriverReplySink> + 'static,
400    ) -> Result<(Client, SessionHandle), SessionError>
401    where
402        S: LinkSource,
403        S::Link: Link + Send + 'static,
404        <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
405        <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
406        <S::Link as Link>::Rx: MaybeSend + Send + 'static,
407    {
408        let Self {
409            mut source,
410            mode,
411            root_settings,
412            metadata,
413            on_connection,
414            keepalive,
415            resumable,
416            operation_store,
417            spawn_fn,
418        } = self;
419
420        match mode {
421            TransportMode::Bare => {
422                let attachment = source.next_link().await.map_err(SessionError::Io)?;
423                let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
424                    .await
425                    .map_err(session_error_from_transport)?;
426                let handshake_result = handshake_as_initiator(
427                    &link.tx,
428                    &mut link.rx,
429                    root_settings.clone(),
430                    true,
431                    None,
432                )
433                .await
434                .map_err(session_error_from_handshake)?;
435                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
436                    .map_err(SessionError::Protocol)?;
437                let builder = SessionInitiatorBuilder::new(
438                    BareConduit::with_message_plan(link, message_plan),
439                    handshake_result,
440                );
441                let recoverer = Box::new(BareSourceRecoverer {
442                    source,
443                    settings: root_settings.clone(),
444                });
445                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
446                    builder,
447                    root_settings,
448                    metadata,
449                    on_connection,
450                    keepalive,
451                    resumable,
452                    Some(recoverer),
453                    operation_store,
454                    spawn_fn,
455                )
456                .establish(handler)
457                .await
458            }
459            TransportMode::Stable => {
460                // Get first link and do transport + CBOR handshake before
461                // handing to the stable conduit.
462                let attachment = source.next_link().await.map_err(SessionError::Io)?;
463                let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
464                    .await
465                    .map_err(session_error_from_transport)?;
466                let handshake_result = handshake_as_initiator(
467                    &link.tx,
468                    &mut link.rx,
469                    root_settings.clone(),
470                    true,
471                    None,
472                )
473                .await
474                .map_err(session_error_from_handshake)?;
475                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
476                    .map_err(SessionError::Protocol)?;
477                let conduit = StableConduit::<MessageFamily, _>::with_first_link(
478                    link.tx,
479                    link.rx,
480                    None, // initiator side — no ClientHello
481                    TransportedLinkSource {
482                        source,
483                        mode: TransportMode::Stable,
484                    },
485                )
486                .await
487                .map_err(|error| {
488                    SessionError::Protocol(format!("stable conduit setup failed: {error}"))
489                })?
490                .with_message_plan(message_plan);
491                let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
492                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
493                    builder,
494                    root_settings,
495                    metadata,
496                    on_connection,
497                    keepalive,
498                    resumable,
499                    None,
500                    operation_store,
501                    spawn_fn,
502                )
503                .establish(handler)
504                .await
505            }
506        }
507    }
508}
509
510pub struct SessionTransportInitiatorBuilder<'a, L> {
511    link: L,
512    mode: TransportMode,
513    root_settings: ConnectionSettings,
514    metadata: Metadata<'a>,
515    on_connection: Option<Box<dyn ConnectionAcceptor>>,
516    keepalive: Option<SessionKeepaliveConfig>,
517    resumable: bool,
518    operation_store: Option<Arc<dyn OperationStore>>,
519    spawn_fn: SpawnFn,
520}
521
522impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
523    fn new(link: L, mode: TransportMode) -> Self {
524        Self {
525            link,
526            mode,
527            root_settings: ConnectionSettings {
528                parity: Parity::Odd,
529                max_concurrent_requests: 64,
530            },
531            metadata: vec![],
532            on_connection: None,
533            keepalive: None,
534            resumable: false,
535            operation_store: None,
536            spawn_fn: default_spawn_fn(),
537        }
538    }
539
540    pub fn parity(mut self, parity: Parity) -> Self {
541        self.root_settings.parity = parity;
542        self
543    }
544
545    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
546        self.root_settings = settings;
547        self
548    }
549
550    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
551        self.root_settings.max_concurrent_requests = max_concurrent_requests;
552        self
553    }
554
555    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
556        self.metadata = metadata;
557        self
558    }
559
560    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
561        self.on_connection = Some(Box::new(acceptor));
562        self
563    }
564
565    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
566        self.keepalive = Some(keepalive);
567        self
568    }
569
570    pub fn resumable(mut self) -> Self {
571        self.resumable = true;
572        self
573    }
574
575    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
576        self.operation_store = Some(operation_store);
577        self
578    }
579
580    #[cfg(not(target_arch = "wasm32"))]
581    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
582        self.spawn_fn = Box::new(f);
583        self
584    }
585
586    #[cfg(target_arch = "wasm32")]
587    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
588        self.spawn_fn = Box::new(f);
589        self
590    }
591
592    #[cfg(not(target_arch = "wasm32"))]
593    pub async fn establish<Client: From<DriverCaller>>(
594        self,
595        handler: impl Handler<DriverReplySink> + 'static,
596    ) -> Result<(Client, SessionHandle), SessionError>
597    where
598        L: Link + Send + 'static,
599        L::Tx: MaybeSend + MaybeSync + 'static,
600        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
601        L::Rx: MaybeSend + 'static,
602    {
603        let Self {
604            link,
605            mode,
606            root_settings,
607            metadata,
608            on_connection,
609            keepalive,
610            resumable,
611            operation_store,
612            spawn_fn,
613        } = self;
614        match mode {
615            TransportMode::Bare => {
616                let link = initiate_transport(link, TransportMode::Bare)
617                    .await
618                    .map_err(session_error_from_transport)?;
619                Self::finish_with_bare_parts(
620                    link,
621                    root_settings,
622                    metadata,
623                    on_connection,
624                    keepalive,
625                    resumable,
626                    operation_store,
627                    spawn_fn,
628                    handler,
629                )
630                .await
631            }
632            TransportMode::Stable => {
633                let link = initiate_transport(link, TransportMode::Stable)
634                    .await
635                    .map_err(session_error_from_transport)?;
636                Self::finish_with_stable_parts(
637                    link,
638                    root_settings,
639                    metadata,
640                    on_connection,
641                    keepalive,
642                    resumable,
643                    operation_store,
644                    spawn_fn,
645                    handler,
646                )
647                .await
648            }
649        }
650    }
651
652    #[cfg(target_arch = "wasm32")]
653    pub async fn establish<Client: From<DriverCaller>>(
654        self,
655        handler: impl Handler<DriverReplySink> + 'static,
656    ) -> Result<(Client, SessionHandle), SessionError>
657    where
658        L: Link + 'static,
659        L::Tx: MaybeSend + MaybeSync + 'static,
660        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
661        L::Rx: MaybeSend + 'static,
662    {
663        let Self {
664            link,
665            mode,
666            root_settings,
667            metadata,
668            on_connection,
669            keepalive,
670            resumable,
671            operation_store,
672            spawn_fn,
673        } = self;
674        match mode {
675            TransportMode::Bare => {
676                let link = initiate_transport(link, TransportMode::Bare)
677                    .await
678                    .map_err(session_error_from_transport)?;
679                Self::finish_with_bare_parts(
680                    link,
681                    root_settings,
682                    metadata,
683                    on_connection,
684                    keepalive,
685                    resumable,
686                    operation_store,
687                    spawn_fn,
688                    handler,
689                )
690                .await
691            }
692            TransportMode::Stable => Err(SessionError::Protocol(
693                "stable conduit transport selection is unsupported on wasm".into(),
694            )),
695        }
696    }
697
698    #[allow(clippy::too_many_arguments)]
699    async fn finish_with_bare_parts<Client: From<DriverCaller>>(
700        mut link: SplitLink<L::Tx, L::Rx>,
701        root_settings: ConnectionSettings,
702        metadata: Metadata<'a>,
703        on_connection: Option<Box<dyn ConnectionAcceptor>>,
704        keepalive: Option<SessionKeepaliveConfig>,
705        resumable: bool,
706        operation_store: Option<Arc<dyn OperationStore>>,
707        spawn_fn: SpawnFn,
708        handler: impl Handler<DriverReplySink> + 'static,
709    ) -> Result<(Client, SessionHandle), SessionError>
710    where
711        L: Link + 'static,
712        L::Tx: MaybeSend + MaybeSync + 'static,
713        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
714        L::Rx: MaybeSend + 'static,
715    {
716        let handshake_result =
717            handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
718                .await
719                .map_err(session_error_from_handshake)?;
720        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
721            .map_err(SessionError::Protocol)?;
722        let builder = SessionInitiatorBuilder::new(
723            BareConduit::with_message_plan(link, message_plan),
724            handshake_result,
725        );
726        Self::apply_common_parts(
727            builder,
728            root_settings,
729            metadata,
730            on_connection,
731            keepalive,
732            resumable,
733            None,
734            operation_store,
735            spawn_fn,
736        )
737        .establish(handler)
738        .await
739    }
740
741    #[cfg(not(target_arch = "wasm32"))]
742    #[allow(clippy::too_many_arguments)]
743    async fn finish_with_stable_parts<Client: From<DriverCaller>>(
744        mut link: SplitLink<L::Tx, L::Rx>,
745        root_settings: ConnectionSettings,
746        metadata: Metadata<'a>,
747        on_connection: Option<Box<dyn ConnectionAcceptor>>,
748        keepalive: Option<SessionKeepaliveConfig>,
749        resumable: bool,
750        operation_store: Option<Arc<dyn OperationStore>>,
751        spawn_fn: SpawnFn,
752        handler: impl Handler<DriverReplySink> + 'static,
753    ) -> Result<(Client, SessionHandle), SessionError>
754    where
755        L: Link + Send + 'static,
756        L::Tx: MaybeSend + MaybeSync + Send + 'static,
757        for<'p> <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
758        L::Rx: MaybeSend + Send + 'static,
759    {
760        let handshake_result =
761            handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
762                .await
763                .map_err(session_error_from_handshake)?;
764        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
765            .map_err(SessionError::Protocol)?;
766        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
767            link.tx,
768            link.rx,
769            None,
770            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
771        )
772        .await
773        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
774        .with_message_plan(message_plan);
775        let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
776        Self::apply_common_parts(
777            builder,
778            root_settings,
779            metadata,
780            on_connection,
781            keepalive,
782            resumable,
783            None,
784            operation_store,
785            spawn_fn,
786        )
787        .establish(handler)
788        .await
789    }
790
791    #[allow(clippy::too_many_arguments)]
792    #[allow(clippy::too_many_arguments)]
793    fn apply_common_parts<C>(
794        mut builder: SessionInitiatorBuilder<'a, C>,
795        root_settings: ConnectionSettings,
796        metadata: Metadata<'a>,
797        on_connection: Option<Box<dyn ConnectionAcceptor>>,
798        keepalive: Option<SessionKeepaliveConfig>,
799        resumable: bool,
800        recoverer: Option<Box<dyn ConduitRecoverer>>,
801        operation_store: Option<Arc<dyn OperationStore>>,
802        spawn_fn: SpawnFn,
803    ) -> SessionInitiatorBuilder<'a, C> {
804        builder.root_settings = root_settings;
805        builder.metadata = metadata;
806        builder.on_connection = on_connection;
807        builder.keepalive = keepalive;
808        builder.resumable = resumable;
809        builder.recoverer = recoverer;
810        builder.operation_store = operation_store;
811        builder.spawn_fn = spawn_fn;
812        builder
813    }
814}
815
816#[cfg(not(target_arch = "wasm32"))]
817struct BareSourceRecoverer<S> {
818    source: S,
819    settings: ConnectionSettings,
820}
821
822#[cfg(not(target_arch = "wasm32"))]
823impl<S> ConduitRecoverer for BareSourceRecoverer<S>
824where
825    S: LinkSource,
826    S::Link: Link + Send + 'static,
827    <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
828    <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
829    <S::Link as Link>::Rx: MaybeSend + Send + 'static,
830{
831    fn next_conduit<'a>(
832        &'a mut self,
833        resume_key: Option<&'a SessionResumeKey>,
834    ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
835        Box::pin(async move {
836            let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
837            let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
838                .await
839                .map_err(session_error_from_transport)?;
840            let handshake_result = handshake_as_initiator(
841                &link.tx,
842                &mut link.rx,
843                self.settings.clone(),
844                true,
845                resume_key,
846            )
847            .await
848            .map_err(session_error_from_handshake)?;
849            let conduit = BareConduit::<MessageFamily, _>::new(link);
850            let (tx, rx) = conduit.split();
851            Ok(super::RecoveredConduit {
852                tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
853                rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
854                handshake: handshake_result,
855            })
856        })
857    }
858}
859
860#[cfg(not(target_arch = "wasm32"))]
861struct TransportedLinkSource<S> {
862    source: S,
863    mode: TransportMode,
864}
865
866#[cfg(not(target_arch = "wasm32"))]
867impl<S> LinkSource for TransportedLinkSource<S>
868where
869    S: LinkSource,
870    S::Link: Link + Send + 'static,
871    <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
872    <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
873    <S::Link as Link>::Rx: MaybeSend + Send + 'static,
874{
875    type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
876
877    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
878        let attachment = self.source.next_link().await?;
879        let link = initiate_transport(attachment.into_link(), self.mode)
880            .await
881            .map_err(std::io::Error::other)?;
882        Ok(Attachment::initiator(link))
883    }
884}
885
886pub struct SessionAcceptorBuilder<'a, C> {
887    conduit: C,
888    handshake_result: HandshakeResult,
889    root_settings: ConnectionSettings,
890    metadata: Metadata<'a>,
891    on_connection: Option<Box<dyn ConnectionAcceptor>>,
892    keepalive: Option<SessionKeepaliveConfig>,
893    resumable: bool,
894    session_registry: Option<SessionRegistry>,
895    operation_store: Option<Arc<dyn OperationStore>>,
896    spawn_fn: SpawnFn,
897}
898
899impl<'a, C> SessionAcceptorBuilder<'a, C> {
900    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
901        let root_settings = handshake_result.our_settings.clone();
902        Self {
903            conduit,
904            handshake_result,
905            root_settings,
906            metadata: vec![],
907            on_connection: None,
908            keepalive: None,
909            resumable: false,
910            session_registry: None,
911            operation_store: None,
912            spawn_fn: default_spawn_fn(),
913        }
914    }
915
916    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
917        self.on_connection = Some(Box::new(acceptor));
918        self
919    }
920
921    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
922        self.keepalive = Some(keepalive);
923        self
924    }
925
926    pub fn resumable(mut self) -> Self {
927        self.resumable = true;
928        self
929    }
930
931    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
932        self.session_registry = Some(session_registry);
933        self
934    }
935
936    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
937        self.operation_store = Some(operation_store);
938        self
939    }
940
941    /// Override the function used to spawn the session background task.
942    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
943    #[cfg(not(target_arch = "wasm32"))]
944    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
945        self.spawn_fn = Box::new(f);
946        self
947    }
948
949    /// Override the function used to spawn the session background task.
950    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
951    #[cfg(target_arch = "wasm32")]
952    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
953        self.spawn_fn = Box::new(f);
954        self
955    }
956
957    #[moire::instrument]
958    pub async fn establish<Client: From<DriverCaller>>(
959        self,
960        handler: impl Handler<DriverReplySink> + 'static,
961    ) -> Result<(Client, SessionHandle), SessionError>
962    where
963        C: Conduit<Msg = MessageFamily> + 'static,
964        C::Tx: MaybeSend + MaybeSync + 'static,
965        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
966        C::Rx: MaybeSend + 'static,
967    {
968        let Self {
969            conduit,
970            handshake_result,
971            root_settings,
972            metadata: _metadata,
973            on_connection,
974            keepalive,
975            resumable,
976            session_registry,
977            operation_store,
978            spawn_fn,
979        } = self;
980        validate_negotiated_root_settings(&root_settings, &handshake_result)?;
981        let (tx, rx) = conduit.split();
982        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
983        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
984        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
985        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
986        let mut session = Session::pre_handshake(
987            tx,
988            rx,
989            on_connection,
990            open_rx,
991            close_rx,
992            resume_rx,
993            control_tx.clone(),
994            control_rx,
995            keepalive,
996            resumable,
997            None,
998        );
999        let handle = session.establish_from_handshake(handshake_result)?;
1000        let resume_key = session.resume_key();
1001        let session_handle = SessionHandle {
1002            open_tx,
1003            close_tx,
1004            resume_tx,
1005            control_tx,
1006            resume_key,
1007        };
1008        if let (Some(registry), Some(key)) = (&session_registry, resume_key) {
1009            registry.insert(key, session_handle.clone());
1010        }
1011        let mut driver = match operation_store {
1012            Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
1013            None => Driver::new(handle, handler),
1014        };
1015        let client = Client::from(driver.caller());
1016        (spawn_fn)(Box::pin(async move { session.run().await }));
1017        #[cfg(not(target_arch = "wasm32"))]
1018        tokio::spawn(async move { driver.run().await });
1019        #[cfg(target_arch = "wasm32")]
1020        wasm_bindgen_futures::spawn_local(async move { driver.run().await });
1021        Ok((client, session_handle))
1022    }
1023
1024    #[moire::instrument]
1025    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1026        self,
1027        handler: impl Handler<DriverReplySink> + 'static,
1028    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1029    where
1030        C: Conduit<Msg = MessageFamily> + 'static,
1031        C::Tx: MaybeSend + MaybeSync + 'static,
1032        for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
1033        C::Rx: MaybeSend + 'static,
1034    {
1035        // With the CBOR handshake, resume detection happens at the link level
1036        // before the conduit is created. If the peer sent a resume key in the Hello
1037        // that matches a known session, we resume. Otherwise, we establish.
1038        if let (Some(registry), Some(resume_key)) = (
1039            &self.session_registry,
1040            self.handshake_result.peer_resume_key,
1041        ) {
1042            if let Some(handle) = registry.get(&resume_key) {
1043                let (tx, rx) = self.conduit.split();
1044                if let Err(error) = handle
1045                    .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1046                    .await
1047                {
1048                    registry.remove(&resume_key);
1049                    return Err(error);
1050                }
1051                return Ok(SessionAcceptOutcome::Resumed);
1052            }
1053            return Err(SessionError::Protocol("unknown session resume key".into()));
1054        }
1055
1056        let (client, session_handle) = self.establish(handler).await?;
1057        Ok(SessionAcceptOutcome::Established(client, session_handle))
1058    }
1059}
1060
1061pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1062    link: L,
1063    root_settings: ConnectionSettings,
1064    metadata: Metadata<'a>,
1065    on_connection: Option<Box<dyn ConnectionAcceptor>>,
1066    keepalive: Option<SessionKeepaliveConfig>,
1067    resumable: bool,
1068    session_registry: Option<SessionRegistry>,
1069    operation_store: Option<Arc<dyn OperationStore>>,
1070    spawn_fn: SpawnFn,
1071}
1072
1073impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1074    fn new(link: L) -> Self {
1075        Self {
1076            link,
1077            root_settings: ConnectionSettings {
1078                parity: Parity::Even,
1079                max_concurrent_requests: 64,
1080            },
1081            metadata: vec![],
1082            on_connection: None,
1083            keepalive: None,
1084            resumable: true,
1085            session_registry: None,
1086            operation_store: None,
1087            spawn_fn: default_spawn_fn(),
1088        }
1089    }
1090
1091    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1092        self.root_settings = settings;
1093        self
1094    }
1095
1096    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1097        self.root_settings.max_concurrent_requests = max_concurrent_requests;
1098        self
1099    }
1100
1101    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1102        self.metadata = metadata;
1103        self
1104    }
1105
1106    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1107        self.on_connection = Some(Box::new(acceptor));
1108        self
1109    }
1110
1111    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1112        self.keepalive = Some(keepalive);
1113        self
1114    }
1115
1116    pub fn resumable(mut self) -> Self {
1117        self.resumable = true;
1118        self
1119    }
1120
1121    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1122        self.session_registry = Some(session_registry);
1123        self
1124    }
1125
1126    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1127        self.operation_store = Some(operation_store);
1128        self
1129    }
1130
1131    #[cfg(not(target_arch = "wasm32"))]
1132    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1133        self.spawn_fn = Box::new(f);
1134        self
1135    }
1136
1137    #[cfg(target_arch = "wasm32")]
1138    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1139        self.spawn_fn = Box::new(f);
1140        self
1141    }
1142
1143    #[moire::instrument]
1144    #[cfg(not(target_arch = "wasm32"))]
1145    pub async fn establish<Client: From<DriverCaller>>(
1146        self,
1147        handler: impl Handler<DriverReplySink> + 'static,
1148    ) -> Result<(Client, SessionHandle), SessionError>
1149    where
1150        L: Link + Send + 'static,
1151        L::Tx: MaybeSend + MaybeSync + 'static,
1152        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1153        L::Rx: MaybeSend + 'static,
1154    {
1155        let Self {
1156            link,
1157            root_settings,
1158            metadata,
1159            on_connection,
1160            keepalive,
1161            resumable,
1162            session_registry,
1163            operation_store,
1164            spawn_fn,
1165        } = self;
1166        let (mode, mut link) = accept_transport(link)
1167            .await
1168            .map_err(session_error_from_transport)?;
1169        match mode {
1170            TransportMode::Bare => {
1171                let handshake_result = handshake_as_acceptor(
1172                    &link.tx,
1173                    &mut link.rx,
1174                    root_settings.clone(),
1175                    true,
1176                    resumable,
1177                    None,
1178                )
1179                .await
1180                .map_err(session_error_from_handshake)?;
1181                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1182                    .map_err(SessionError::Protocol)?;
1183                let builder = SessionAcceptorBuilder::new(
1184                    BareConduit::with_message_plan(link, message_plan),
1185                    handshake_result,
1186                );
1187                Self::apply_common_parts(
1188                    builder,
1189                    root_settings,
1190                    metadata,
1191                    on_connection,
1192                    keepalive,
1193                    resumable,
1194                    session_registry,
1195                    operation_store,
1196                    spawn_fn,
1197                )
1198                .establish(handler)
1199                .await
1200            }
1201            TransportMode::Stable => {
1202                Self::finish_with_stable_parts(
1203                    link,
1204                    root_settings,
1205                    metadata,
1206                    on_connection,
1207                    keepalive,
1208                    resumable,
1209                    session_registry,
1210                    operation_store,
1211                    spawn_fn,
1212                    handler,
1213                )
1214                .await
1215            }
1216        }
1217    }
1218
1219    #[moire::instrument]
1220    #[cfg(not(target_arch = "wasm32"))]
1221    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1222        self,
1223        handler: impl Handler<DriverReplySink> + 'static,
1224    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1225    where
1226        L: Link + Send + 'static,
1227        L::Tx: MaybeSend + MaybeSync + 'static,
1228        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1229        L::Rx: MaybeSend + 'static,
1230    {
1231        let Self {
1232            link,
1233            root_settings,
1234            metadata,
1235            on_connection,
1236            keepalive,
1237            resumable,
1238            session_registry,
1239            operation_store,
1240            spawn_fn,
1241        } = self;
1242        let (mode, mut link) = accept_transport(link)
1243            .await
1244            .map_err(session_error_from_transport)?;
1245        match mode {
1246            TransportMode::Bare => {
1247                let handshake_result = handshake_as_acceptor(
1248                    &link.tx,
1249                    &mut link.rx,
1250                    root_settings.clone(),
1251                    true,
1252                    resumable,
1253                    None,
1254                )
1255                .await
1256                .map_err(session_error_from_handshake)?;
1257                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1258                    .map_err(SessionError::Protocol)?;
1259                let builder = SessionAcceptorBuilder::new(
1260                    BareConduit::with_message_plan(link, message_plan),
1261                    handshake_result,
1262                );
1263                Self::apply_common_parts(
1264                    builder,
1265                    root_settings,
1266                    metadata,
1267                    on_connection,
1268                    keepalive,
1269                    resumable,
1270                    session_registry,
1271                    operation_store,
1272                    spawn_fn,
1273                )
1274                .establish_or_resume(handler)
1275                .await
1276            }
1277            TransportMode::Stable => Self::finish_with_stable_parts(
1278                link,
1279                root_settings,
1280                metadata,
1281                on_connection,
1282                keepalive,
1283                resumable,
1284                session_registry,
1285                operation_store,
1286                spawn_fn,
1287                handler,
1288            )
1289            .await
1290            .map(|(client, handle)| SessionAcceptOutcome::Established(client, handle)),
1291        }
1292    }
1293
1294    #[cfg(target_arch = "wasm32")]
1295    pub async fn establish<Client: From<DriverCaller>>(
1296        self,
1297        handler: impl Handler<DriverReplySink> + 'static,
1298    ) -> Result<(Client, SessionHandle), SessionError>
1299    where
1300        L: Link + 'static,
1301        L::Tx: MaybeSend + MaybeSync + 'static,
1302        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1303        L::Rx: MaybeSend + 'static,
1304    {
1305        let Self {
1306            link,
1307            root_settings,
1308            metadata,
1309            on_connection,
1310            keepalive,
1311            resumable,
1312            session_registry,
1313            operation_store,
1314            spawn_fn,
1315        } = self;
1316        let (mode, mut link) = accept_transport(link)
1317            .await
1318            .map_err(session_error_from_transport)?;
1319        match mode {
1320            TransportMode::Bare => {
1321                let handshake_result = handshake_as_acceptor(
1322                    &link.tx,
1323                    &mut link.rx,
1324                    root_settings.clone(),
1325                    true,
1326                    resumable,
1327                    None,
1328                )
1329                .await
1330                .map_err(session_error_from_handshake)?;
1331                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1332                    .map_err(SessionError::Protocol)?;
1333                let builder = SessionAcceptorBuilder::new(
1334                    BareConduit::with_message_plan(link, message_plan),
1335                    handshake_result,
1336                );
1337                Self::apply_common_parts(
1338                    builder,
1339                    root_settings,
1340                    metadata,
1341                    on_connection,
1342                    keepalive,
1343                    resumable,
1344                    session_registry,
1345                    operation_store,
1346                    spawn_fn,
1347                )
1348                .establish(handler)
1349                .await
1350            }
1351            TransportMode::Stable => Err(SessionError::Protocol(
1352                "stable conduit transport selection is unsupported on wasm".into(),
1353            )),
1354        }
1355    }
1356
1357    #[cfg(target_arch = "wasm32")]
1358    pub async fn establish_or_resume<Client: From<DriverCaller>>(
1359        self,
1360        handler: impl Handler<DriverReplySink> + 'static,
1361    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1362    where
1363        L: Link + 'static,
1364        L::Tx: MaybeSend + MaybeSync + 'static,
1365        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1366        L::Rx: MaybeSend + 'static,
1367    {
1368        let Self {
1369            link,
1370            root_settings,
1371            metadata,
1372            on_connection,
1373            keepalive,
1374            resumable,
1375            session_registry,
1376            operation_store,
1377            spawn_fn,
1378        } = self;
1379        let (mode, mut link) = accept_transport(link)
1380            .await
1381            .map_err(session_error_from_transport)?;
1382        match mode {
1383            TransportMode::Bare => {
1384                let handshake_result = handshake_as_acceptor(
1385                    &link.tx,
1386                    &mut link.rx,
1387                    root_settings.clone(),
1388                    true,
1389                    resumable,
1390                    None,
1391                )
1392                .await
1393                .map_err(session_error_from_handshake)?;
1394                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1395                    .map_err(SessionError::Protocol)?;
1396                let builder = SessionAcceptorBuilder::new(
1397                    BareConduit::with_message_plan(link, message_plan),
1398                    handshake_result,
1399                );
1400                Self::apply_common_parts(
1401                    builder,
1402                    root_settings,
1403                    metadata,
1404                    on_connection,
1405                    keepalive,
1406                    resumable,
1407                    session_registry,
1408                    operation_store,
1409                    spawn_fn,
1410                )
1411                .establish_or_resume(handler)
1412                .await
1413            }
1414            TransportMode::Stable => Err(SessionError::Protocol(
1415                "stable conduit transport selection is unsupported on wasm".into(),
1416            )),
1417        }
1418    }
1419
1420    #[cfg(not(target_arch = "wasm32"))]
1421    #[allow(clippy::too_many_arguments)]
1422    async fn finish_with_stable_parts<Client: From<DriverCaller>>(
1423        mut link: SplitLink<L::Tx, L::Rx>,
1424        root_settings: ConnectionSettings,
1425        metadata: Metadata<'a>,
1426        on_connection: Option<Box<dyn ConnectionAcceptor>>,
1427        keepalive: Option<SessionKeepaliveConfig>,
1428        resumable: bool,
1429        session_registry: Option<SessionRegistry>,
1430        operation_store: Option<Arc<dyn OperationStore>>,
1431        spawn_fn: SpawnFn,
1432        handler: impl Handler<DriverReplySink> + 'static,
1433    ) -> Result<(Client, SessionHandle), SessionError>
1434    where
1435        L: Link + Send + 'static,
1436        L::Tx: MaybeSend + MaybeSync + Send + 'static,
1437        <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1438        L::Rx: MaybeSend + Send + 'static,
1439    {
1440        let handshake_result = handshake_as_acceptor(
1441            &link.tx,
1442            &mut link.rx,
1443            root_settings.clone(),
1444            true,
1445            resumable,
1446            None,
1447        )
1448        .await
1449        .map_err(session_error_from_handshake)?;
1450        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1451            .map_err(SessionError::Protocol)?;
1452        // Read the stable conduit's ClientHello — the initiator sends it
1453        // after the CBOR session handshake completes.
1454        let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1455            .await
1456            .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1457        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1458            link.tx,
1459            link.rx,
1460            Some(client_hello),
1461            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1462        )
1463        .await
1464        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1465        .with_message_plan(message_plan);
1466        let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1467        Self::apply_common_parts(
1468            builder,
1469            root_settings,
1470            metadata,
1471            on_connection,
1472            keepalive,
1473            resumable,
1474            session_registry,
1475            operation_store,
1476            spawn_fn,
1477        )
1478        .establish(handler)
1479        .await
1480    }
1481
1482    #[allow(clippy::too_many_arguments)]
1483    #[allow(clippy::too_many_arguments)]
1484    fn apply_common_parts<C>(
1485        mut builder: SessionAcceptorBuilder<'a, C>,
1486        root_settings: ConnectionSettings,
1487        metadata: Metadata<'a>,
1488        on_connection: Option<Box<dyn ConnectionAcceptor>>,
1489        keepalive: Option<SessionKeepaliveConfig>,
1490        resumable: bool,
1491        session_registry: Option<SessionRegistry>,
1492        operation_store: Option<Arc<dyn OperationStore>>,
1493        spawn_fn: SpawnFn,
1494    ) -> SessionAcceptorBuilder<'a, C> {
1495        builder.root_settings = root_settings;
1496        builder.metadata = metadata;
1497        builder.on_connection = on_connection;
1498        builder.keepalive = keepalive;
1499        builder.resumable = resumable;
1500        builder.session_registry = session_registry;
1501        builder.operation_store = operation_store;
1502        builder.spawn_fn = spawn_fn;
1503        builder
1504    }
1505}
1506
1507fn validate_negotiated_root_settings(
1508    expected_root_settings: &ConnectionSettings,
1509    handshake_result: &HandshakeResult,
1510) -> Result<(), SessionError> {
1511    if handshake_result.our_settings != *expected_root_settings {
1512        return Err(SessionError::Protocol(
1513            "negotiated root settings do not match builder settings".into(),
1514        ));
1515    }
1516    Ok(())
1517}
1518
1519fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1520    match error {
1521        crate::HandshakeError::Io(io) => SessionError::Io(io),
1522        crate::HandshakeError::PeerClosed => {
1523            SessionError::Protocol("peer closed during handshake".into())
1524        }
1525        crate::HandshakeError::NotResumable => SessionError::NotResumable,
1526        other => SessionError::Protocol(other.to_string()),
1527    }
1528}
1529
1530fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1531    match error {
1532        crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1533        crate::TransportPrologueError::LinkDead => {
1534            SessionError::Protocol("link closed during transport prologue".into())
1535        }
1536        crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1537        crate::TransportPrologueError::Rejected(reason) => {
1538            SessionError::Protocol(format!("transport rejected: {reason}"))
1539        }
1540    }
1541}