Skip to main content

vox_core/session/
builders.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12    Conduit, ConnectionSettings, HandshakeResult, Link, MaybeSend, MaybeSync, MessageFamily,
13    Metadata, Parity, SessionResumeKey, SplitLink, metadata_into_owned,
14};
15
16use crate::{Attachment, LinkSource, StableConduit};
17use crate::{
18    BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
19    handshake_as_acceptor, handshake_as_initiator, initiate_transport,
20};
21
22use super::{
23    CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
24    SessionHandle, SessionKeepaliveConfig,
25};
26use crate::FromVoxSession;
27
28/// Well-known metadata key for service name routing.
29pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
30
31/// Inject `vox-service` metadata from `Client::SERVICE_NAME`.
32fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
33    metadata.push(vox_types::MetadataEntry {
34        key: VOX_SERVICE_METADATA_KEY.into(),
35        value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
36        flags: vox_types::MetadataFlags::NONE,
37    });
38}
39
40/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
41/// on WASM it's `'static` only (no `Send` requirement).
42#[cfg(not(target_arch = "wasm32"))]
43pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
44#[cfg(target_arch = "wasm32")]
45pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
46
47#[cfg(not(target_arch = "wasm32"))]
48type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
49#[cfg(target_arch = "wasm32")]
50type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
51
52#[cfg(not(target_arch = "wasm32"))]
53fn default_spawn_fn() -> SpawnFn {
54    Box::new(|fut| {
55        tokio::spawn(fut);
56    })
57}
58
59#[cfg(target_arch = "wasm32")]
60fn default_spawn_fn() -> SpawnFn {
61    Box::new(|fut| {
62        wasm_bindgen_futures::spawn_local(fut);
63    })
64}
65
66// r[impl rpc.session-setup]
67// r[impl session.role]
68pub fn initiator_conduit<I: IntoConduit>(
69    into_conduit: I,
70    handshake_result: HandshakeResult,
71) -> SessionInitiatorBuilder<'static, I::Conduit> {
72    SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
73}
74
75pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
76where
77    S: LinkSource,
78{
79    SessionSourceInitiatorBuilder::new(source, mode)
80}
81
82pub fn acceptor_conduit<I: IntoConduit>(
83    into_conduit: I,
84    handshake_result: HandshakeResult,
85) -> SessionAcceptorBuilder<'static, I::Conduit> {
86    SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
87}
88
89/// Convenience: perform CBOR handshake as initiator on a raw link, then return
90/// a builder with the conduit ready to go.
91pub async fn initiator_on_link<L: Link>(
92    link: L,
93    settings: ConnectionSettings,
94) -> Result<
95    SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
96    SessionError,
97>
98where
99    L::Tx: MaybeSend + MaybeSync + 'static,
100    L::Rx: MaybeSend + 'static,
101{
102    let (tx, mut rx) = link.split();
103    let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
104        .await
105        .map_err(session_error_from_handshake)?;
106    let message_plan =
107        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
108    Ok(SessionInitiatorBuilder::new(
109        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
110        handshake_result,
111    ))
112}
113
114/// Convenience: perform CBOR handshake as acceptor on a raw link, then return
115/// a builder with the conduit ready to go.
116pub async fn acceptor_on_link<L: Link>(
117    link: L,
118    settings: ConnectionSettings,
119) -> Result<
120    SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
121    SessionError,
122>
123where
124    L::Tx: MaybeSend + MaybeSync + 'static,
125    L::Rx: MaybeSend + 'static,
126{
127    let (tx, mut rx) = link.split();
128    let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
129        .await
130        .map_err(session_error_from_handshake)?;
131    let message_plan =
132        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
133    Ok(SessionAcceptorBuilder::new(
134        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
135        handshake_result,
136    ))
137}
138
139pub fn initiator_on<L: Link>(
140    link: L,
141    mode: TransportMode,
142) -> SessionTransportInitiatorBuilder<'static, L> {
143    SessionTransportInitiatorBuilder::new(link, mode)
144}
145
146pub fn initiator_transport<L: Link>(
147    link: L,
148    mode: TransportMode,
149) -> SessionTransportInitiatorBuilder<'static, L> {
150    initiator_on(link, mode)
151}
152
153pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
154    SessionTransportAcceptorBuilder::new(link)
155}
156
157pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
158    acceptor_on(link)
159}
160
161#[derive(Clone, Default)]
162pub struct SessionRegistry {
163    inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
164}
165
166impl SessionRegistry {
167    fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
168        self.inner
169            .lock()
170            .expect("session registry poisoned")
171            .get(key)
172            .cloned()
173    }
174
175    fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
176        self.inner
177            .lock()
178            .expect("session registry poisoned")
179            .insert(key, handle);
180    }
181
182    fn remove(&self, key: &SessionResumeKey) {
183        self.inner
184            .lock()
185            .expect("session registry poisoned")
186            .remove(key);
187    }
188}
189
190pub enum SessionAcceptOutcome<Client> {
191    Established(Client),
192    Resumed,
193}
194
195/// Shared configuration for all session builders.
196pub struct SessionConfig<'a> {
197    pub root_settings: ConnectionSettings,
198    pub metadata: Metadata<'a>,
199    pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
200    pub keepalive: Option<SessionKeepaliveConfig>,
201    pub resumable: bool,
202    pub session_registry: Option<SessionRegistry>,
203    pub operation_store: Option<Arc<dyn OperationStore>>,
204    pub spawn_fn: SpawnFn,
205    pub connect_timeout: Option<std::time::Duration>,
206    pub recovery_timeout: Option<std::time::Duration>,
207}
208
209impl SessionConfig<'_> {
210    fn with_settings(root_settings: ConnectionSettings) -> Self {
211        Self {
212            root_settings,
213            metadata: vec![],
214            on_connection: None,
215            keepalive: None,
216            resumable: true,
217            session_registry: None,
218            operation_store: None,
219            spawn_fn: default_spawn_fn(),
220            connect_timeout: None,
221            recovery_timeout: None,
222        }
223    }
224}
225
226impl Default for SessionConfig<'_> {
227    fn default() -> Self {
228        Self::with_settings(ConnectionSettings {
229            parity: Parity::Odd,
230            max_concurrent_requests: 64,
231        })
232    }
233}
234
235pub struct SessionInitiatorBuilder<'a, C> {
236    conduit: C,
237    handshake_result: HandshakeResult,
238    config: SessionConfig<'a>,
239    recoverer: Option<Box<dyn ConduitRecoverer>>,
240}
241
242impl<'a, C> SessionInitiatorBuilder<'a, C> {
243    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
244        let root_settings = handshake_result.our_settings.clone();
245        let mut config = SessionConfig::with_settings(root_settings);
246        // Conduit builders default to non-resumable — callers opt in with .resumable()
247        config.resumable = false;
248        Self {
249            conduit,
250            handshake_result,
251            config,
252            recoverer: None,
253        }
254    }
255
256    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
257        self.config.on_connection = Some(Arc::new(acceptor));
258        self
259    }
260
261    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
262        self.config.keepalive = Some(keepalive);
263        self
264    }
265
266    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
267        self.config.connect_timeout = Some(timeout);
268        self
269    }
270
271    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
272        self.config.recovery_timeout = Some(timeout);
273        self
274    }
275
276    pub fn resumable(mut self) -> Self {
277        self.config.resumable = true;
278        self
279    }
280
281    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
282        self.config.operation_store = Some(operation_store);
283        self
284    }
285
286    /// Override the function used to spawn the session background task.
287    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
288    #[cfg(not(target_arch = "wasm32"))]
289    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
290        self.config.spawn_fn = Box::new(f);
291        self
292    }
293
294    /// Override the function used to spawn the session background task.
295    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
296    #[cfg(target_arch = "wasm32")]
297    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
298        self.config.spawn_fn = Box::new(f);
299        self
300    }
301
302    /// Establish a session using the given settings, on the given link source, etc,
303    ///
304    ///   - requiring (as an arg) a handler for the service the local peer will serve
305    ///   - returning a caller for the service we expect the remote peer to serve
306    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
307    where
308        C: Conduit<Msg = MessageFamily> + 'static,
309        C::Tx: MaybeSend + MaybeSync + 'static,
310        C::Rx: MaybeSend + 'static,
311    {
312        let Self {
313            conduit,
314            mut handshake_result,
315            config,
316            recoverer,
317        } = self;
318        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
319        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
320        let (tx, rx) = conduit.split();
321        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
322        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
323        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
324        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
325        let acceptor: Arc<dyn ConnectionAcceptor> =
326            config.on_connection.unwrap_or_else(|| Arc::new(()));
327        let mut session = Session::pre_handshake(
328            tx,
329            rx,
330            Some(acceptor.clone()),
331            open_rx,
332            close_rx,
333            resume_rx,
334            control_tx.clone(),
335            control_rx,
336            config.keepalive,
337            config.resumable,
338            recoverer,
339            config.recovery_timeout,
340        );
341        let handle = session.establish_from_handshake(handshake_result)?;
342        let resume_key = session.resume_key();
343        let session_handle = SessionHandle {
344            open_tx,
345            close_tx,
346            resume_tx,
347            control_tx,
348            resume_key,
349        };
350        // Route the root connection through the acceptor.
351        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
352        let pending = super::PendingConnection::with_caller_slot(
353            handle,
354            caller_slot.clone(),
355            config.operation_store,
356        );
357        peer_metadata.push(vox_types::MetadataEntry::str(
358            VOX_SERVICE_METADATA_KEY,
359            Client::SERVICE_NAME,
360        ));
361        let request = super::ConnectionRequest::new(&peer_metadata)?;
362        acceptor
363            .accept(&request, pending)
364            .map_err(SessionError::Rejected)?;
365        let caller =
366            caller_slot.lock().unwrap().take().expect(
367                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
368            );
369        let client = Client::from_vox_session(caller, Some(session_handle));
370        (config.spawn_fn)(Box::pin(async move { session.run().await }));
371        Ok(client)
372    }
373}
374
375pub struct SessionSourceInitiatorBuilder<'a, S> {
376    source: S,
377    mode: TransportMode,
378    config: SessionConfig<'a>,
379}
380
381impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
382    fn new(source: S, mode: TransportMode) -> Self {
383        let config = SessionConfig {
384            resumable: false,
385            ..SessionConfig::default()
386        };
387        Self {
388            source,
389            mode,
390            config,
391        }
392    }
393
394    pub fn parity(mut self, parity: Parity) -> Self {
395        self.config.root_settings.parity = parity;
396        self
397    }
398
399    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
400        self.config.root_settings = settings;
401        self
402    }
403
404    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
405        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
406        self
407    }
408
409    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
410        self.config.metadata = metadata;
411        self
412    }
413
414    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
415        self.config.on_connection = Some(Arc::new(acceptor));
416        self
417    }
418
419    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
420        self.config.keepalive = Some(keepalive);
421        self
422    }
423
424    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
425        self.config.connect_timeout = Some(timeout);
426        self
427    }
428
429    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
430        self.config.recovery_timeout = Some(timeout);
431        self
432    }
433
434    pub fn resumable(mut self) -> Self {
435        self.config.resumable = true;
436        self
437    }
438
439    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
440        self.config.operation_store = Some(operation_store);
441        self
442    }
443
444    #[cfg(not(target_arch = "wasm32"))]
445    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
446        self.config.spawn_fn = Box::new(f);
447        self
448    }
449
450    #[cfg(target_arch = "wasm32")]
451    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
452        self.config.spawn_fn = Box::new(f);
453        self
454    }
455
456    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
457    where
458        S: LinkSource,
459        S::Link: Link + MaybeSend + 'static,
460        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
461        <S::Link as Link>::Rx: MaybeSend + 'static,
462    {
463        let connect_timeout = self.config.connect_timeout;
464        let fut = self.establish_inner::<Client>();
465        match connect_timeout {
466            Some(timeout) => time::timeout(timeout, fut)
467                .await
468                .map_err(|_| SessionError::ConnectTimeout)?,
469            None => fut.await,
470        }
471    }
472
473    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
474    where
475        S: LinkSource,
476        S::Link: Link + MaybeSend + 'static,
477        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
478        <S::Link as Link>::Rx: MaybeSend + 'static,
479    {
480        let Self {
481            mut source,
482            mode,
483            mut config,
484        } = self;
485        inject_service_metadata::<Client>(&mut config.metadata);
486
487        match mode {
488            TransportMode::Bare => {
489                let attachment = source.next_link().await.map_err(SessionError::Io)?;
490                let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
491                    .await
492                    .map_err(session_error_from_transport)?;
493                let handshake_result = handshake_as_initiator(
494                    &link.tx,
495                    &mut link.rx,
496                    config.root_settings.clone(),
497                    true,
498                    None,
499                    metadata_into_owned(config.metadata.clone()),
500                )
501                .await
502                .map_err(session_error_from_handshake)?;
503                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
504                    .map_err(SessionError::Protocol)?;
505                let builder = SessionInitiatorBuilder::new(
506                    BareConduit::with_message_plan(link, message_plan),
507                    handshake_result,
508                );
509                let recoverer = Box::new(BareSourceRecoverer {
510                    source,
511                    settings: config.root_settings.clone(),
512                    connect_timeout: config.connect_timeout,
513                    metadata: metadata_into_owned(config.metadata.clone()),
514                });
515                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
516                    builder,
517                    config,
518                    Some(recoverer),
519                )
520                .establish()
521                .await
522            }
523            TransportMode::Stable => {
524                let attachment = source.next_link().await.map_err(SessionError::Io)?;
525                let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
526                    .await
527                    .map_err(session_error_from_transport)?;
528                let handshake_result = handshake_as_initiator(
529                    &link.tx,
530                    &mut link.rx,
531                    config.root_settings.clone(),
532                    true,
533                    None,
534                    metadata_into_owned(config.metadata.clone()),
535                )
536                .await
537                .map_err(session_error_from_handshake)?;
538                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
539                    .map_err(SessionError::Protocol)?;
540                let conduit = StableConduit::<MessageFamily, _>::with_first_link(
541                    link.tx,
542                    link.rx,
543                    None,
544                    TransportedLinkSource {
545                        source,
546                        mode: TransportMode::Stable,
547                    },
548                )
549                .await
550                .map_err(|error| {
551                    SessionError::Protocol(format!("stable conduit setup failed: {error}"))
552                })?
553                .with_message_plan(message_plan);
554                let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
555                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
556                    builder, config, None,
557                )
558                .establish()
559                .await
560            }
561        }
562    }
563}
564
565pub struct SessionTransportInitiatorBuilder<'a, L> {
566    link: L,
567    mode: TransportMode,
568    config: SessionConfig<'a>,
569}
570
571impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
572    fn new(link: L, mode: TransportMode) -> Self {
573        let config = SessionConfig {
574            resumable: false,
575            ..SessionConfig::default()
576        };
577        Self { link, mode, config }
578    }
579
580    pub fn parity(mut self, parity: Parity) -> Self {
581        self.config.root_settings.parity = parity;
582        self
583    }
584
585    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
586        self.config.root_settings = settings;
587        self
588    }
589
590    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
591        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
592        self
593    }
594
595    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
596        self.config.metadata = metadata;
597        self
598    }
599
600    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
601        self.config.on_connection = Some(Arc::new(acceptor));
602        self
603    }
604
605    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
606        self.config.keepalive = Some(keepalive);
607        self
608    }
609
610    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
611        self.config.connect_timeout = Some(timeout);
612        self
613    }
614
615    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
616        self.config.recovery_timeout = Some(timeout);
617        self
618    }
619
620    pub fn resumable(mut self) -> Self {
621        self.config.resumable = true;
622        self
623    }
624
625    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
626        self.config.operation_store = Some(operation_store);
627        self
628    }
629
630    #[cfg(not(target_arch = "wasm32"))]
631    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
632        self.config.spawn_fn = Box::new(f);
633        self
634    }
635
636    #[cfg(target_arch = "wasm32")]
637    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
638        self.config.spawn_fn = Box::new(f);
639        self
640    }
641
642    #[cfg(not(target_arch = "wasm32"))]
643    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
644    where
645        L: Link + Send + 'static,
646        L::Tx: MaybeSend + MaybeSync + 'static,
647        L::Rx: MaybeSend + 'static,
648    {
649        let connect_timeout = self.config.connect_timeout;
650        let fut = self.establish_inner::<Client>();
651        match connect_timeout {
652            Some(timeout) => tokio::time::timeout(timeout, fut)
653                .await
654                .map_err(|_| SessionError::ConnectTimeout)?,
655            None => fut.await,
656        }
657    }
658
659    #[cfg(not(target_arch = "wasm32"))]
660    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
661    where
662        L: Link + Send + 'static,
663        L::Tx: MaybeSend + MaybeSync + 'static,
664        L::Rx: MaybeSend + 'static,
665    {
666        let Self {
667            link,
668            mode,
669            mut config,
670        } = self;
671        inject_service_metadata::<Client>(&mut config.metadata);
672        match mode {
673            TransportMode::Bare => {
674                let link = initiate_transport(link, TransportMode::Bare)
675                    .await
676                    .map_err(session_error_from_transport)?;
677                Self::finish_with_bare_parts(link, config).await
678            }
679            TransportMode::Stable => {
680                let link = initiate_transport(link, TransportMode::Stable)
681                    .await
682                    .map_err(session_error_from_transport)?;
683                Self::finish_with_stable_parts(link, config).await
684            }
685        }
686    }
687
688    #[cfg(target_arch = "wasm32")]
689    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
690    where
691        L: Link + 'static,
692        L::Tx: MaybeSend + MaybeSync + 'static,
693        L::Rx: MaybeSend + 'static,
694    {
695        let Self {
696            link,
697            mode,
698            mut config,
699        } = self;
700        inject_service_metadata::<Client>(&mut config.metadata);
701        match mode {
702            TransportMode::Bare => {
703                let link = initiate_transport(link, TransportMode::Bare)
704                    .await
705                    .map_err(session_error_from_transport)?;
706                Self::finish_with_bare_parts(link, config).await
707            }
708            TransportMode::Stable => Err(SessionError::Protocol(
709                "stable conduit transport selection is unsupported on wasm".into(),
710            )),
711        }
712    }
713
714    async fn finish_with_bare_parts<Client: FromVoxSession>(
715        mut link: SplitLink<L::Tx, L::Rx>,
716        config: SessionConfig<'a>,
717    ) -> Result<Client, SessionError>
718    where
719        L: Link + 'static,
720        L::Tx: MaybeSend + MaybeSync + 'static,
721        L::Rx: MaybeSend + 'static,
722    {
723        let handshake_result = handshake_as_initiator(
724            &link.tx,
725            &mut link.rx,
726            config.root_settings.clone(),
727            true,
728            None,
729            metadata_into_owned(config.metadata.clone()),
730        )
731        .await
732        .map_err(session_error_from_handshake)?;
733        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
734            .map_err(SessionError::Protocol)?;
735        let builder = SessionInitiatorBuilder::new(
736            BareConduit::with_message_plan(link, message_plan),
737            handshake_result,
738        );
739        Self::apply_common_parts(builder, config, None)
740            .establish()
741            .await
742    }
743
744    #[cfg(not(target_arch = "wasm32"))]
745    async fn finish_with_stable_parts<Client: FromVoxSession>(
746        mut link: SplitLink<L::Tx, L::Rx>,
747        config: SessionConfig<'a>,
748    ) -> Result<Client, SessionError>
749    where
750        L: Link + Send + 'static,
751        L::Tx: MaybeSend + MaybeSync + Send + 'static,
752        L::Rx: MaybeSend + Send + 'static,
753    {
754        let handshake_result = handshake_as_initiator(
755            &link.tx,
756            &mut link.rx,
757            config.root_settings.clone(),
758            true,
759            None,
760            metadata_into_owned(config.metadata.clone()),
761        )
762        .await
763        .map_err(session_error_from_handshake)?;
764        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
765            .map_err(SessionError::Protocol)?;
766        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
767            link.tx,
768            link.rx,
769            None,
770            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
771        )
772        .await
773        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
774        .with_message_plan(message_plan);
775        let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
776        Self::apply_common_parts(builder, config, None)
777            .establish()
778            .await
779    }
780
781    #[allow(clippy::too_many_arguments)]
782    fn apply_common_parts<C>(
783        mut builder: SessionInitiatorBuilder<'a, C>,
784        config: SessionConfig<'a>,
785        recoverer: Option<Box<dyn ConduitRecoverer>>,
786    ) -> SessionInitiatorBuilder<'a, C> {
787        builder.config = config;
788        builder.recoverer = recoverer;
789        builder
790    }
791}
792
793struct BareSourceRecoverer<S> {
794    source: S,
795    settings: ConnectionSettings,
796    connect_timeout: Option<Duration>,
797    metadata: Metadata<'static>,
798}
799
800const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
801const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
802
803impl<S> ConduitRecoverer for BareSourceRecoverer<S>
804where
805    S: LinkSource,
806    S::Link: Link + MaybeSend + 'static,
807    <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
808    <S::Link as Link>::Rx: MaybeSend + 'static,
809{
810    fn next_conduit<'a>(
811        &'a mut self,
812        resume_key: Option<&'a SessionResumeKey>,
813    ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
814        Box::pin(async move {
815            let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
816            let mut use_resume_key = resume_key.is_some();
817
818            loop {
819                let selected_resume_key = if use_resume_key { resume_key } else { None };
820
821                let attempt = async {
822                    let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
823                    let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
824                        .await
825                        .map_err(session_error_from_transport)?;
826                    let handshake_result = handshake_as_initiator(
827                        &link.tx,
828                        &mut link.rx,
829                        self.settings.clone(),
830                        true,
831                        selected_resume_key,
832                        metadata_into_owned(self.metadata.clone()),
833                    )
834                    .await
835                    .map_err(session_error_from_handshake)?;
836                    let conduit = BareConduit::<MessageFamily, _>::new(link);
837                    let (tx, rx) = conduit.split();
838                    Ok(super::RecoveredConduit {
839                        tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
840                        rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
841                        handshake: handshake_result,
842                    })
843                };
844
845                let result = match self.connect_timeout {
846                    Some(timeout) => match time::timeout(timeout, attempt).await {
847                        Ok(r) => r,
848                        Err(_) => Err(SessionError::ConnectTimeout),
849                    },
850                    None => attempt.await,
851                };
852
853                match result {
854                    Ok(conduit) => return Ok(conduit),
855                    Err(e) if !e.is_retryable() => return Err(e),
856                    Err(_) => {}
857                }
858
859                if use_resume_key {
860                    // If a resumption attempt is rejected once, continue trying without
861                    // a resume key so restart scenarios can establish a fresh session.
862                    use_resume_key = false;
863                }
864
865                time::sleep(backoff).await;
866                backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
867            }
868        })
869    }
870}
871
872struct TransportedLinkSource<S> {
873    source: S,
874    mode: TransportMode,
875}
876
877impl<S> LinkSource for TransportedLinkSource<S>
878where
879    S: LinkSource,
880    S::Link: Link + MaybeSend + 'static,
881    <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
882    <S::Link as Link>::Rx: MaybeSend + 'static,
883{
884    type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
885
886    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
887        let attachment = self.source.next_link().await?;
888        let link = initiate_transport(attachment.into_link(), self.mode)
889            .await
890            .map_err(std::io::Error::other)?;
891        Ok(Attachment::initiator(link))
892    }
893}
894
895pub struct SessionAcceptorBuilder<'a, C> {
896    conduit: C,
897    handshake_result: HandshakeResult,
898    config: SessionConfig<'a>,
899}
900
901impl<'a, C> SessionAcceptorBuilder<'a, C> {
902    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
903        let root_settings = handshake_result.our_settings.clone();
904        let mut config = SessionConfig::with_settings(root_settings);
905        // Conduit builders default to non-resumable — callers opt in with .resumable()
906        config.resumable = false;
907        Self {
908            conduit,
909            handshake_result,
910            config,
911        }
912    }
913
914    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
915        self.config.on_connection = Some(Arc::new(acceptor));
916        self
917    }
918
919    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
920        self.config.keepalive = Some(keepalive);
921        self
922    }
923
924    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
925        self.config.connect_timeout = Some(timeout);
926        self
927    }
928
929    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
930        self.config.recovery_timeout = Some(timeout);
931        self
932    }
933
934    pub fn resumable(mut self) -> Self {
935        self.config.resumable = true;
936        self
937    }
938
939    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
940        self.config.session_registry = Some(session_registry);
941        self
942    }
943
944    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
945        self.config.operation_store = Some(operation_store);
946        self
947    }
948
949    /// Override the function used to spawn the session background task.
950    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
951    #[cfg(not(target_arch = "wasm32"))]
952    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
953        self.config.spawn_fn = Box::new(f);
954        self
955    }
956
957    /// Override the function used to spawn the session background task.
958    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
959    #[cfg(target_arch = "wasm32")]
960    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
961        self.config.spawn_fn = Box::new(f);
962        self
963    }
964
965    #[moire::instrument]
966    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
967    where
968        C: Conduit<Msg = MessageFamily> + 'static,
969        C::Tx: MaybeSend + MaybeSync + 'static,
970        C::Rx: MaybeSend + 'static,
971    {
972        let Self {
973            conduit,
974            mut handshake_result,
975            config,
976        } = self;
977        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
978        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
979        let (tx, rx) = conduit.split();
980        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
981        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
982        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
983        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
984        let acceptor: Arc<dyn ConnectionAcceptor> =
985            config.on_connection.unwrap_or_else(|| Arc::new(()));
986        let mut session = Session::pre_handshake(
987            tx,
988            rx,
989            Some(acceptor.clone()),
990            open_rx,
991            close_rx,
992            resume_rx,
993            control_tx.clone(),
994            control_rx,
995            config.keepalive,
996            config.resumable,
997            None,
998            config.recovery_timeout,
999        );
1000        let handle = session.establish_from_handshake(handshake_result)?;
1001        let resume_key = session.resume_key();
1002        let session_handle = SessionHandle {
1003            open_tx,
1004            close_tx,
1005            resume_tx,
1006            control_tx,
1007            resume_key,
1008        };
1009        if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1010            registry.insert(key, session_handle.clone());
1011            session.registered_in_registry = true;
1012        }
1013        // Route the root connection through the acceptor.
1014        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1015        let pending = super::PendingConnection::with_caller_slot(
1016            handle,
1017            caller_slot.clone(),
1018            config.operation_store,
1019        );
1020        peer_metadata.push(vox_types::MetadataEntry::str(
1021            VOX_SERVICE_METADATA_KEY,
1022            Client::SERVICE_NAME,
1023        ));
1024        let request = super::ConnectionRequest::new(&peer_metadata)?;
1025        acceptor
1026            .accept(&request, pending)
1027            .map_err(SessionError::Rejected)?;
1028        let caller =
1029            caller_slot.lock().unwrap().take().expect(
1030                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1031            );
1032        let client = Client::from_vox_session(caller, Some(session_handle));
1033        (config.spawn_fn)(Box::pin(async move { session.run().await }));
1034        Ok(client)
1035    }
1036
1037    #[moire::instrument]
1038    pub async fn establish_or_resume<Client: FromVoxSession>(
1039        self,
1040    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1041    where
1042        C: Conduit<Msg = MessageFamily> + 'static,
1043        C::Tx: MaybeSend + MaybeSync + 'static,
1044        C::Rx: MaybeSend + 'static,
1045    {
1046        // With the CBOR handshake, resume detection happens at the link level
1047        // before the conduit is created. If the peer sent a resume key in the Hello
1048        // that matches a known session, we resume. Otherwise, we establish.
1049        if let (Some(registry), Some(resume_key)) = (
1050            &self.config.session_registry,
1051            self.handshake_result.peer_resume_key,
1052        ) && let Some(handle) = registry.get(&resume_key)
1053        {
1054            let (tx, rx) = self.conduit.split();
1055            if let Err(error) = handle
1056                .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1057                .await
1058            {
1059                registry.remove(&resume_key);
1060                return Err(error);
1061            }
1062            return Ok(SessionAcceptOutcome::Resumed);
1063        }
1064
1065        let client = self.establish().await?;
1066        Ok(SessionAcceptOutcome::Established(client))
1067    }
1068}
1069
1070pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1071    link: L,
1072    config: SessionConfig<'a>,
1073}
1074
1075impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1076    fn new(link: L) -> Self {
1077        Self {
1078            link,
1079            config: SessionConfig::with_settings(ConnectionSettings {
1080                parity: Parity::Even,
1081                max_concurrent_requests: 64,
1082            }),
1083        }
1084    }
1085
1086    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1087        self.config.root_settings = settings;
1088        self
1089    }
1090
1091    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1092        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1093        self
1094    }
1095
1096    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1097        self.config.metadata = metadata;
1098        self
1099    }
1100
1101    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1102        self.config.on_connection = Some(Arc::new(acceptor));
1103        self
1104    }
1105
1106    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1107        self.config.keepalive = Some(keepalive);
1108        self
1109    }
1110
1111    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1112        self.config.connect_timeout = Some(timeout);
1113        self
1114    }
1115
1116    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1117        self.config.recovery_timeout = Some(timeout);
1118        self
1119    }
1120
1121    pub fn resumable(mut self) -> Self {
1122        self.config.resumable = true;
1123        self
1124    }
1125
1126    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1127        self.config.session_registry = Some(session_registry);
1128        self
1129    }
1130
1131    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1132        self.config.operation_store = Some(operation_store);
1133        self
1134    }
1135
1136    #[cfg(not(target_arch = "wasm32"))]
1137    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1138        self.config.spawn_fn = Box::new(f);
1139        self
1140    }
1141
1142    #[cfg(target_arch = "wasm32")]
1143    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1144        self.config.spawn_fn = Box::new(f);
1145        self
1146    }
1147
1148    #[moire::instrument]
1149    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1150    where
1151        L: Link + MaybeSend + 'static,
1152        L::Tx: MaybeSend + MaybeSync + 'static,
1153        L::Rx: MaybeSend + 'static,
1154    {
1155        let Self { link, mut config } = self;
1156        inject_service_metadata::<Client>(&mut config.metadata);
1157        let (mode, mut link) = accept_transport(link)
1158            .await
1159            .map_err(session_error_from_transport)?;
1160        match mode {
1161            TransportMode::Bare => {
1162                let handshake_result = handshake_as_acceptor(
1163                    &link.tx,
1164                    &mut link.rx,
1165                    config.root_settings.clone(),
1166                    true,
1167                    config.resumable,
1168                    None,
1169                    metadata_into_owned(config.metadata.clone()),
1170                )
1171                .await
1172                .map_err(session_error_from_handshake)?;
1173                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1174                    .map_err(SessionError::Protocol)?;
1175                let builder = SessionAcceptorBuilder::new(
1176                    BareConduit::with_message_plan(link, message_plan),
1177                    handshake_result,
1178                );
1179                Self::apply_common_parts(builder, config).establish().await
1180            }
1181            TransportMode::Stable => Self::finish_with_stable_parts(link, config).await,
1182        }
1183    }
1184
1185    #[moire::instrument]
1186    pub async fn establish_or_resume<Client: FromVoxSession>(
1187        self,
1188    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1189    where
1190        L: Link + MaybeSend + 'static,
1191        L::Tx: MaybeSend + MaybeSync + 'static,
1192        L::Rx: MaybeSend + 'static,
1193    {
1194        let Self { link, config } = self;
1195        let (mode, mut link) = accept_transport(link)
1196            .await
1197            .map_err(session_error_from_transport)?;
1198        match mode {
1199            TransportMode::Bare => {
1200                let handshake_result = handshake_as_acceptor(
1201                    &link.tx,
1202                    &mut link.rx,
1203                    config.root_settings.clone(),
1204                    true,
1205                    config.resumable,
1206                    None,
1207                    metadata_into_owned(config.metadata.clone()),
1208                )
1209                .await
1210                .map_err(session_error_from_handshake)?;
1211                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1212                    .map_err(SessionError::Protocol)?;
1213                let builder = SessionAcceptorBuilder::new(
1214                    BareConduit::with_message_plan(link, message_plan),
1215                    handshake_result,
1216                );
1217                Self::apply_common_parts(builder, config)
1218                    .establish_or_resume()
1219                    .await
1220            }
1221            TransportMode::Stable => Self::finish_with_stable_parts(link, config)
1222                .await
1223                .map(SessionAcceptOutcome::Established),
1224        }
1225    }
1226
1227    async fn finish_with_stable_parts<Client: FromVoxSession>(
1228        mut link: SplitLink<L::Tx, L::Rx>,
1229        config: SessionConfig<'a>,
1230    ) -> Result<Client, SessionError>
1231    where
1232        L: Link + MaybeSend + 'static,
1233        L::Tx: MaybeSend + MaybeSync + 'static,
1234        L::Rx: MaybeSend + 'static,
1235    {
1236        let handshake_result = handshake_as_acceptor(
1237            &link.tx,
1238            &mut link.rx,
1239            config.root_settings.clone(),
1240            true,
1241            config.resumable,
1242            None,
1243            metadata_into_owned(config.metadata.clone()),
1244        )
1245        .await
1246        .map_err(session_error_from_handshake)?;
1247        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1248            .map_err(SessionError::Protocol)?;
1249        let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1250            .await
1251            .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1252        let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1253            link.tx,
1254            link.rx,
1255            Some(client_hello),
1256            crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1257        )
1258        .await
1259        .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1260        .with_message_plan(message_plan);
1261        let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1262        Self::apply_common_parts(builder, config).establish().await
1263    }
1264
1265    fn apply_common_parts<C>(
1266        mut builder: SessionAcceptorBuilder<'a, C>,
1267        config: SessionConfig<'a>,
1268    ) -> SessionAcceptorBuilder<'a, C> {
1269        builder.config = config;
1270        builder
1271    }
1272}
1273
1274fn validate_negotiated_root_settings(
1275    expected_root_settings: &ConnectionSettings,
1276    handshake_result: &HandshakeResult,
1277) -> Result<(), SessionError> {
1278    if handshake_result.our_settings != *expected_root_settings {
1279        return Err(SessionError::Protocol(
1280            "negotiated root settings do not match builder settings".into(),
1281        ));
1282    }
1283    Ok(())
1284}
1285
1286fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1287    match error {
1288        crate::HandshakeError::Io(io) => SessionError::Io(io),
1289        crate::HandshakeError::PeerClosed => {
1290            SessionError::Protocol("peer closed during handshake".into())
1291        }
1292        crate::HandshakeError::NotResumable => SessionError::NotResumable,
1293        other => SessionError::Protocol(other.to_string()),
1294    }
1295}
1296
1297fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1298    match error {
1299        crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1300        crate::TransportPrologueError::LinkDead => {
1301            SessionError::Protocol("link closed during transport prologue".into())
1302        }
1303        crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1304        crate::TransportPrologueError::Rejected(reason) => {
1305            SessionError::Protocol(format!("transport rejected: {reason}"))
1306        }
1307    }
1308}