Skip to main content

vox_core/session/
builders.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12    Conduit, ConnectionSettings, DEFAULT_INITIAL_CHANNEL_CREDIT, HandshakeResult, Link, MaybeSend,
13    MaybeSync, MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink, VoxObserver,
14    VoxObserverHandle, metadata_into_owned,
15};
16
17use crate::LinkSource;
18use crate::{
19    BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
20    handshake_as_acceptor, handshake_as_initiator, initiate_transport,
21};
22
23use super::{
24    CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
25    SessionHandle, SessionKeepaliveConfig,
26};
27use crate::FromVoxSession;
28
29/// Well-known metadata key for service name routing.
30pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
31
32/// Inject `vox-service` metadata from `Client::SERVICE_NAME`.
33fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
34    metadata.push(vox_types::MetadataEntry {
35        key: VOX_SERVICE_METADATA_KEY.into(),
36        value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
37        flags: vox_types::MetadataFlags::NONE,
38    });
39}
40
41/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
42/// on WASM it's `'static` only (no `Send` requirement).
43#[cfg(not(target_arch = "wasm32"))]
44pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
45#[cfg(target_arch = "wasm32")]
46pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
47
48#[cfg(not(target_arch = "wasm32"))]
49type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
50#[cfg(target_arch = "wasm32")]
51type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
52
53#[cfg(not(target_arch = "wasm32"))]
54fn default_spawn_fn() -> SpawnFn {
55    Box::new(|fut| {
56        tokio::spawn(fut);
57    })
58}
59
60#[cfg(target_arch = "wasm32")]
61fn default_spawn_fn() -> SpawnFn {
62    Box::new(|fut| {
63        wasm_bindgen_futures::spawn_local(fut);
64    })
65}
66
67// r[impl rpc.session-setup]
68// r[impl session.role]
69pub fn initiator_conduit<I: IntoConduit>(
70    into_conduit: I,
71    handshake_result: HandshakeResult,
72) -> SessionInitiatorBuilder<'static, I::Conduit> {
73    SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
74}
75
76pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
77where
78    S: LinkSource,
79{
80    SessionSourceInitiatorBuilder::new(source, mode)
81}
82
83pub fn acceptor_conduit<I: IntoConduit>(
84    into_conduit: I,
85    handshake_result: HandshakeResult,
86) -> SessionAcceptorBuilder<'static, I::Conduit> {
87    SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
88}
89
90/// Convenience: perform CBOR handshake as initiator on a raw link, then return
91/// a builder with the conduit ready to go.
92pub async fn initiator_on_link<L: Link>(
93    link: L,
94    settings: ConnectionSettings,
95) -> Result<
96    SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
97    SessionError,
98>
99where
100    L::Tx: MaybeSend + MaybeSync + 'static,
101    L::Rx: MaybeSend + 'static,
102{
103    let (tx, mut rx) = link.split();
104    let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
105        .await
106        .map_err(session_error_from_handshake)?;
107    let message_plan =
108        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
109    Ok(SessionInitiatorBuilder::new(
110        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
111        handshake_result,
112    ))
113}
114
115/// Convenience: perform CBOR handshake as acceptor on a raw link, then return
116/// a builder with the conduit ready to go.
117pub async fn acceptor_on_link<L: Link>(
118    link: L,
119    settings: ConnectionSettings,
120) -> Result<
121    SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
122    SessionError,
123>
124where
125    L::Tx: MaybeSend + MaybeSync + 'static,
126    L::Rx: MaybeSend + 'static,
127{
128    let (tx, mut rx) = link.split();
129    let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
130        .await
131        .map_err(session_error_from_handshake)?;
132    let message_plan =
133        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
134    Ok(SessionAcceptorBuilder::new(
135        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
136        handshake_result,
137    ))
138}
139
140pub fn initiator_on<L: Link>(
141    link: L,
142    mode: TransportMode,
143) -> SessionTransportInitiatorBuilder<'static, L> {
144    SessionTransportInitiatorBuilder::new(link, mode)
145}
146
147pub fn initiator_transport<L: Link>(
148    link: L,
149    mode: TransportMode,
150) -> SessionTransportInitiatorBuilder<'static, L> {
151    initiator_on(link, mode)
152}
153
154pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
155    SessionTransportAcceptorBuilder::new(link)
156}
157
158pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
159    acceptor_on(link)
160}
161
162#[derive(Clone, Default)]
163pub struct SessionRegistry {
164    inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
165}
166
167impl SessionRegistry {
168    fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
169        self.inner
170            .lock()
171            .expect("session registry poisoned")
172            .get(key)
173            .cloned()
174    }
175
176    fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
177        self.inner
178            .lock()
179            .expect("session registry poisoned")
180            .insert(key, handle);
181    }
182
183    fn remove(&self, key: &SessionResumeKey) {
184        self.inner
185            .lock()
186            .expect("session registry poisoned")
187            .remove(key);
188    }
189}
190
191pub enum SessionAcceptOutcome<Client> {
192    Established(Client),
193    Resumed,
194}
195
196/// Shared configuration for all session builders.
197pub struct SessionConfig<'a> {
198    pub root_settings: ConnectionSettings,
199    pub metadata: Metadata<'a>,
200    pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
201    pub keepalive: Option<SessionKeepaliveConfig>,
202    pub resumable: bool,
203    pub session_registry: Option<SessionRegistry>,
204    pub operation_store: Option<Arc<dyn OperationStore>>,
205    pub spawn_fn: SpawnFn,
206    pub connect_timeout: Option<std::time::Duration>,
207    pub recovery_timeout: Option<std::time::Duration>,
208    pub observer: Option<VoxObserverHandle>,
209}
210
211impl SessionConfig<'_> {
212    fn with_settings(root_settings: ConnectionSettings) -> Self {
213        Self {
214            root_settings,
215            metadata: vec![],
216            on_connection: None,
217            keepalive: None,
218            resumable: true,
219            session_registry: None,
220            operation_store: None,
221            spawn_fn: default_spawn_fn(),
222            connect_timeout: None,
223            recovery_timeout: None,
224            observer: None,
225        }
226    }
227}
228
229impl Default for SessionConfig<'_> {
230    fn default() -> Self {
231        Self::with_settings(ConnectionSettings {
232            parity: Parity::Odd,
233            max_concurrent_requests: 64,
234            initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
235        })
236    }
237}
238
239pub struct SessionInitiatorBuilder<'a, C> {
240    conduit: C,
241    handshake_result: HandshakeResult,
242    config: SessionConfig<'a>,
243    recoverer: Option<Box<dyn ConduitRecoverer>>,
244}
245
246impl<'a, C> SessionInitiatorBuilder<'a, C> {
247    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
248        let root_settings = handshake_result.our_settings.clone();
249        let mut config = SessionConfig::with_settings(root_settings);
250        // Conduit builders default to non-resumable — callers opt in with .resumable()
251        config.resumable = false;
252        Self {
253            conduit,
254            handshake_result,
255            config,
256            recoverer: None,
257        }
258    }
259
260    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
261        self.config.on_connection = Some(Arc::new(acceptor));
262        self
263    }
264
265    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
266        self.config.keepalive = Some(keepalive);
267        self
268    }
269
270    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
271        self.config.root_settings.initial_channel_credit = channel_capacity;
272        self
273    }
274
275    // r[impl rpc.observability.runtime]
276    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
277        self.config.observer = Some(Arc::new(observer));
278        self
279    }
280
281    // r[impl rpc.observability.runtime]
282    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
283        self.config.observer = Some(observer);
284        self
285    }
286
287    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
288        self.config.connect_timeout = Some(timeout);
289        self
290    }
291
292    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
293        self.config.recovery_timeout = Some(timeout);
294        self
295    }
296
297    pub fn resumable(mut self) -> Self {
298        self.config.resumable = true;
299        self
300    }
301
302    /// Disable session resumability. Useful for IPC transports where
303    /// the peer is a process: when the process exits the connection
304    /// is gone for good, there's nothing to resume against, and
305    /// keeping the session alive in recovery mode just leaks
306    /// per-channel state (e.g. server-side `Tx<T>`s never observing
307    /// that the client disconnected).
308    pub fn non_resumable(mut self) -> Self {
309        self.config.resumable = false;
310        self
311    }
312
313    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
314        self.config.operation_store = Some(operation_store);
315        self
316    }
317
318    /// Override the function used to spawn the session background task.
319    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
320    #[cfg(not(target_arch = "wasm32"))]
321    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
322        self.config.spawn_fn = Box::new(f);
323        self
324    }
325
326    /// Override the function used to spawn the session background task.
327    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
328    #[cfg(target_arch = "wasm32")]
329    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
330        self.config.spawn_fn = Box::new(f);
331        self
332    }
333
334    /// Establish a session using the given settings, on the given link source, etc,
335    ///
336    ///   - requiring (as an arg) a handler for the service the local peer will serve
337    ///   - returning a caller for the service we expect the remote peer to serve
338    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
339    where
340        C: Conduit<Msg = MessageFamily> + 'static,
341        C::Tx: MaybeSend + MaybeSync + 'static,
342        C::Rx: MaybeSend + 'static,
343    {
344        let Self {
345            conduit,
346            mut handshake_result,
347            config,
348            recoverer,
349        } = self;
350        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
351        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
352        let (tx, rx) = conduit.split();
353        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
354        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
355        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
356        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
357        let acceptor: Arc<dyn ConnectionAcceptor> =
358            config.on_connection.unwrap_or_else(|| Arc::new(()));
359        let mut session = Session::pre_handshake(
360            tx,
361            rx,
362            Some(acceptor.clone()),
363            open_rx,
364            close_rx,
365            resume_rx,
366            control_tx.clone(),
367            control_rx,
368            config.keepalive,
369            config.resumable,
370            recoverer,
371            config.recovery_timeout,
372            config.observer.clone(),
373        );
374        let handle = session.establish_from_handshake(handshake_result)?;
375        let resume_key = session.resume_key();
376        let session_handle = SessionHandle {
377            open_tx,
378            close_tx,
379            resume_tx,
380            control_tx,
381            resume_key,
382        };
383        // Route the root connection through the acceptor.
384        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
385        let pending = super::PendingConnection::with_caller_slot(
386            handle,
387            caller_slot.clone(),
388            config.operation_store,
389        );
390        peer_metadata.push(vox_types::MetadataEntry::str(
391            VOX_SERVICE_METADATA_KEY,
392            Client::SERVICE_NAME,
393        ));
394        let request = super::ConnectionRequest::new(&peer_metadata)?;
395        tracing::debug!(
396            service = Client::SERVICE_NAME,
397            "vox root connection routing to acceptor"
398        );
399        match acceptor.accept(&request, pending) {
400            Ok(()) => tracing::debug!(
401                service = Client::SERVICE_NAME,
402                "vox root connection accepted"
403            ),
404            Err(metadata) => {
405                tracing::debug!(
406                    service = Client::SERVICE_NAME,
407                    metadata_len = metadata.len(),
408                    "vox root connection rejected"
409                );
410                return Err(SessionError::Rejected(metadata));
411            }
412        }
413        let caller =
414            caller_slot.lock().unwrap().take().expect(
415                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
416            );
417        let client = Client::from_vox_session(caller, Some(session_handle));
418        (config.spawn_fn)(Box::pin(async move { session.run().await }));
419        Ok(client)
420    }
421}
422
423pub struct SessionSourceInitiatorBuilder<'a, S> {
424    source: S,
425    mode: TransportMode,
426    config: SessionConfig<'a>,
427}
428
429impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
430    fn new(source: S, mode: TransportMode) -> Self {
431        let config = SessionConfig {
432            resumable: false,
433            ..SessionConfig::default()
434        };
435        Self {
436            source,
437            mode,
438            config,
439        }
440    }
441
442    pub fn parity(mut self, parity: Parity) -> Self {
443        self.config.root_settings.parity = parity;
444        self
445    }
446
447    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
448        self.config.root_settings = settings;
449        self
450    }
451
452    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
453        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
454        self
455    }
456
457    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
458        self.config.root_settings.initial_channel_credit = channel_capacity;
459        self
460    }
461
462    // r[impl rpc.observability.runtime]
463    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
464        self.config.observer = Some(Arc::new(observer));
465        self
466    }
467
468    // r[impl rpc.observability.runtime]
469    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
470        self.config.observer = Some(observer);
471        self
472    }
473
474    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
475        self.config.metadata = metadata;
476        self
477    }
478
479    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
480        self.config.on_connection = Some(Arc::new(acceptor));
481        self
482    }
483
484    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
485        self.config.keepalive = Some(keepalive);
486        self
487    }
488
489    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
490        self.config.connect_timeout = Some(timeout);
491        self
492    }
493
494    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
495        self.config.recovery_timeout = Some(timeout);
496        self
497    }
498
499    pub fn resumable(mut self) -> Self {
500        self.config.resumable = true;
501        self
502    }
503
504    /// Disable session resumability. Useful for IPC transports where
505    /// the peer is a process: when the process exits the connection
506    /// is gone for good, there's nothing to resume against, and
507    /// keeping the session alive in recovery mode just leaks
508    /// per-channel state (e.g. server-side `Tx<T>`s never observing
509    /// that the client disconnected).
510    pub fn non_resumable(mut self) -> Self {
511        self.config.resumable = false;
512        self
513    }
514
515    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
516        self.config.operation_store = Some(operation_store);
517        self
518    }
519
520    #[cfg(not(target_arch = "wasm32"))]
521    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
522        self.config.spawn_fn = Box::new(f);
523        self
524    }
525
526    #[cfg(target_arch = "wasm32")]
527    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
528        self.config.spawn_fn = Box::new(f);
529        self
530    }
531
532    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
533    where
534        S: LinkSource,
535        S::Link: Link + MaybeSend + 'static,
536        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
537        <S::Link as Link>::Rx: MaybeSend + 'static,
538    {
539        let connect_timeout = self.config.connect_timeout;
540        let fut = self.establish_inner::<Client>();
541        match connect_timeout {
542            Some(timeout) => time::timeout(timeout, fut)
543                .await
544                .map_err(|_| SessionError::ConnectTimeout)?,
545            None => fut.await,
546        }
547    }
548
549    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
550    where
551        S: LinkSource,
552        S::Link: Link + MaybeSend + 'static,
553        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
554        <S::Link as Link>::Rx: MaybeSend + 'static,
555    {
556        let Self {
557            mut source,
558            mode,
559            mut config,
560        } = self;
561        inject_service_metadata::<Client>(&mut config.metadata);
562        let _ = mode;
563
564        {
565            {
566                let attachment = source.next_link().await.map_err(SessionError::Io)?;
567                let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
568                    .await
569                    .map_err(session_error_from_transport)?;
570                let handshake_result = handshake_as_initiator(
571                    &link.tx,
572                    &mut link.rx,
573                    config.root_settings.clone(),
574                    true,
575                    None,
576                    metadata_into_owned(config.metadata.clone()),
577                )
578                .await
579                .map_err(session_error_from_handshake)?;
580                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
581                    .map_err(SessionError::Protocol)?;
582                let builder = SessionInitiatorBuilder::new(
583                    BareConduit::with_message_plan(link, message_plan),
584                    handshake_result,
585                );
586                let recoverer = Box::new(BareSourceRecoverer {
587                    source,
588                    settings: config.root_settings.clone(),
589                    connect_timeout: config.connect_timeout,
590                    metadata: metadata_into_owned(config.metadata.clone()),
591                });
592                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
593                    builder,
594                    config,
595                    Some(recoverer),
596                )
597                .establish()
598                .await
599            }
600        }
601    }
602}
603
604pub struct SessionTransportInitiatorBuilder<'a, L> {
605    link: L,
606    mode: TransportMode,
607    config: SessionConfig<'a>,
608}
609
610impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
611    fn new(link: L, mode: TransportMode) -> Self {
612        let config = SessionConfig {
613            resumable: false,
614            ..SessionConfig::default()
615        };
616        Self { link, mode, config }
617    }
618
619    pub fn parity(mut self, parity: Parity) -> Self {
620        self.config.root_settings.parity = parity;
621        self
622    }
623
624    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
625        self.config.root_settings = settings;
626        self
627    }
628
629    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
630        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
631        self
632    }
633
634    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
635        self.config.root_settings.initial_channel_credit = channel_capacity;
636        self
637    }
638
639    // r[impl rpc.observability.runtime]
640    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
641        self.config.observer = Some(Arc::new(observer));
642        self
643    }
644
645    // r[impl rpc.observability.runtime]
646    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
647        self.config.observer = Some(observer);
648        self
649    }
650
651    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
652        self.config.metadata = metadata;
653        self
654    }
655
656    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
657        self.config.on_connection = Some(Arc::new(acceptor));
658        self
659    }
660
661    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
662        self.config.keepalive = Some(keepalive);
663        self
664    }
665
666    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
667        self.config.connect_timeout = Some(timeout);
668        self
669    }
670
671    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
672        self.config.recovery_timeout = Some(timeout);
673        self
674    }
675
676    pub fn resumable(mut self) -> Self {
677        self.config.resumable = true;
678        self
679    }
680
681    /// Disable session resumability. Useful for IPC transports where
682    /// the peer is a process: when the process exits the connection
683    /// is gone for good, there's nothing to resume against, and
684    /// keeping the session alive in recovery mode just leaks
685    /// per-channel state (e.g. server-side `Tx<T>`s never observing
686    /// that the client disconnected).
687    pub fn non_resumable(mut self) -> Self {
688        self.config.resumable = false;
689        self
690    }
691
692    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
693        self.config.operation_store = Some(operation_store);
694        self
695    }
696
697    #[cfg(not(target_arch = "wasm32"))]
698    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
699        self.config.spawn_fn = Box::new(f);
700        self
701    }
702
703    #[cfg(target_arch = "wasm32")]
704    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
705        self.config.spawn_fn = Box::new(f);
706        self
707    }
708
709    #[cfg(not(target_arch = "wasm32"))]
710    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
711    where
712        L: Link + Send + 'static,
713        L::Tx: MaybeSend + MaybeSync + 'static,
714        L::Rx: MaybeSend + 'static,
715    {
716        let connect_timeout = self.config.connect_timeout;
717        let fut = self.establish_inner::<Client>();
718        match connect_timeout {
719            Some(timeout) => vox_types::time::tokio::timeout(timeout, fut)
720                .await
721                .map_err(|_| SessionError::ConnectTimeout)?,
722            None => fut.await,
723        }
724    }
725
726    #[cfg(not(target_arch = "wasm32"))]
727    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
728    where
729        L: Link + Send + 'static,
730        L::Tx: MaybeSend + MaybeSync + 'static,
731        L::Rx: MaybeSend + 'static,
732    {
733        let Self {
734            link,
735            mode,
736            mut config,
737        } = self;
738        inject_service_metadata::<Client>(&mut config.metadata);
739        let _ = mode;
740        let link = initiate_transport(link, TransportMode::Bare)
741            .await
742            .map_err(session_error_from_transport)?;
743        Self::finish_with_bare_parts(link, config).await
744    }
745
746    #[cfg(target_arch = "wasm32")]
747    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
748    where
749        L: Link + 'static,
750        L::Tx: MaybeSend + MaybeSync + 'static,
751        L::Rx: MaybeSend + 'static,
752    {
753        let Self {
754            link,
755            mode,
756            mut config,
757        } = self;
758        inject_service_metadata::<Client>(&mut config.metadata);
759        match mode {
760            TransportMode::Bare => {
761                let link = initiate_transport(link, TransportMode::Bare)
762                    .await
763                    .map_err(session_error_from_transport)?;
764                Self::finish_with_bare_parts(link, config).await
765            }
766        }
767    }
768
769    async fn finish_with_bare_parts<Client: FromVoxSession>(
770        mut link: SplitLink<L::Tx, L::Rx>,
771        config: SessionConfig<'a>,
772    ) -> Result<Client, SessionError>
773    where
774        L: Link + 'static,
775        L::Tx: MaybeSend + MaybeSync + 'static,
776        L::Rx: MaybeSend + 'static,
777    {
778        let handshake_result = handshake_as_initiator(
779            &link.tx,
780            &mut link.rx,
781            config.root_settings.clone(),
782            true,
783            None,
784            metadata_into_owned(config.metadata.clone()),
785        )
786        .await
787        .map_err(session_error_from_handshake)?;
788        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
789            .map_err(SessionError::Protocol)?;
790        let builder = SessionInitiatorBuilder::new(
791            BareConduit::with_message_plan(link, message_plan),
792            handshake_result,
793        );
794        Self::apply_common_parts(builder, config, None)
795            .establish()
796            .await
797    }
798
799    #[allow(clippy::too_many_arguments)]
800    fn apply_common_parts<C>(
801        mut builder: SessionInitiatorBuilder<'a, C>,
802        config: SessionConfig<'a>,
803        recoverer: Option<Box<dyn ConduitRecoverer>>,
804    ) -> SessionInitiatorBuilder<'a, C> {
805        builder.config = config;
806        builder.recoverer = recoverer;
807        builder
808    }
809}
810
811struct BareSourceRecoverer<S> {
812    source: S,
813    settings: ConnectionSettings,
814    connect_timeout: Option<Duration>,
815    metadata: Metadata<'static>,
816}
817
818const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
819const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
820
821impl<S> ConduitRecoverer for BareSourceRecoverer<S>
822where
823    S: LinkSource,
824    S::Link: Link + MaybeSend + 'static,
825    <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
826    <S::Link as Link>::Rx: MaybeSend + 'static,
827{
828    fn next_conduit<'a>(
829        &'a mut self,
830        resume_key: Option<&'a SessionResumeKey>,
831    ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
832        Box::pin(async move {
833            let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
834            let mut use_resume_key = resume_key.is_some();
835
836            loop {
837                let selected_resume_key = if use_resume_key { resume_key } else { None };
838
839                let attempt = async {
840                    let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
841                    let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
842                        .await
843                        .map_err(session_error_from_transport)?;
844                    let handshake_result = handshake_as_initiator(
845                        &link.tx,
846                        &mut link.rx,
847                        self.settings.clone(),
848                        true,
849                        selected_resume_key,
850                        metadata_into_owned(self.metadata.clone()),
851                    )
852                    .await
853                    .map_err(session_error_from_handshake)?;
854                    let conduit = BareConduit::<MessageFamily, _>::new(link);
855                    let (tx, rx) = conduit.split();
856                    Ok(super::RecoveredConduit {
857                        tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
858                        rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
859                        handshake: handshake_result,
860                    })
861                };
862
863                let result = match self.connect_timeout {
864                    Some(timeout) => match time::timeout(timeout, attempt).await {
865                        Ok(r) => r,
866                        Err(_) => Err(SessionError::ConnectTimeout),
867                    },
868                    None => attempt.await,
869                };
870
871                match result {
872                    Ok(conduit) => return Ok(conduit),
873                    Err(e) if !e.is_retryable() => return Err(e),
874                    Err(_) => {}
875                }
876
877                if use_resume_key {
878                    // If a resumption attempt is rejected once, continue trying without
879                    // a resume key so restart scenarios can establish a fresh session.
880                    use_resume_key = false;
881                }
882
883                time::sleep(backoff).await;
884                backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
885            }
886        })
887    }
888}
889
890pub struct SessionAcceptorBuilder<'a, C> {
891    conduit: C,
892    handshake_result: HandshakeResult,
893    config: SessionConfig<'a>,
894}
895
896impl<'a, C> SessionAcceptorBuilder<'a, C> {
897    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
898        let root_settings = handshake_result.our_settings.clone();
899        let mut config = SessionConfig::with_settings(root_settings);
900        // Conduit builders default to non-resumable — callers opt in with .resumable()
901        config.resumable = false;
902        Self {
903            conduit,
904            handshake_result,
905            config,
906        }
907    }
908
909    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
910        self.config.on_connection = Some(Arc::new(acceptor));
911        self
912    }
913
914    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
915        self.config.keepalive = Some(keepalive);
916        self
917    }
918
919    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
920        self.config.root_settings.initial_channel_credit = channel_capacity;
921        self
922    }
923
924    // r[impl rpc.observability.runtime]
925    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
926        self.config.observer = Some(Arc::new(observer));
927        self
928    }
929
930    // r[impl rpc.observability.runtime]
931    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
932        self.config.observer = Some(observer);
933        self
934    }
935
936    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
937        self.config.connect_timeout = Some(timeout);
938        self
939    }
940
941    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
942        self.config.recovery_timeout = Some(timeout);
943        self
944    }
945
946    pub fn resumable(mut self) -> Self {
947        self.config.resumable = true;
948        self
949    }
950
951    /// Disable session resumability. Useful for IPC transports where
952    /// the peer is a process: when the process exits the connection
953    /// is gone for good, there's nothing to resume against, and
954    /// keeping the session alive in recovery mode just leaks
955    /// per-channel state (e.g. server-side `Tx<T>`s never observing
956    /// that the client disconnected).
957    pub fn non_resumable(mut self) -> Self {
958        self.config.resumable = false;
959        self
960    }
961
962    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
963        self.config.session_registry = Some(session_registry);
964        self
965    }
966
967    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
968        self.config.operation_store = Some(operation_store);
969        self
970    }
971
972    /// Override the function used to spawn the session background task.
973    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
974    #[cfg(not(target_arch = "wasm32"))]
975    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
976        self.config.spawn_fn = Box::new(f);
977        self
978    }
979
980    /// Override the function used to spawn the session background task.
981    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
982    #[cfg(target_arch = "wasm32")]
983    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
984        self.config.spawn_fn = Box::new(f);
985        self
986    }
987
988    #[moire::instrument]
989    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
990    where
991        C: Conduit<Msg = MessageFamily> + 'static,
992        C::Tx: MaybeSend + MaybeSync + 'static,
993        C::Rx: MaybeSend + 'static,
994    {
995        let Self {
996            conduit,
997            mut handshake_result,
998            config,
999        } = self;
1000        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
1001        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
1002        let (tx, rx) = conduit.split();
1003        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
1004        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
1005        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
1006        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
1007        let acceptor: Arc<dyn ConnectionAcceptor> =
1008            config.on_connection.unwrap_or_else(|| Arc::new(()));
1009        let mut session = Session::pre_handshake(
1010            tx,
1011            rx,
1012            Some(acceptor.clone()),
1013            open_rx,
1014            close_rx,
1015            resume_rx,
1016            control_tx.clone(),
1017            control_rx,
1018            config.keepalive,
1019            config.resumable,
1020            None,
1021            config.recovery_timeout,
1022            config.observer.clone(),
1023        );
1024        let handle = session.establish_from_handshake(handshake_result)?;
1025        let resume_key = session.resume_key();
1026        let session_handle = SessionHandle {
1027            open_tx,
1028            close_tx,
1029            resume_tx,
1030            control_tx,
1031            resume_key,
1032        };
1033        if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1034            registry.insert(key, session_handle.clone());
1035            session.registered_in_registry = true;
1036        }
1037        // Route the root connection through the acceptor.
1038        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1039        let pending = super::PendingConnection::with_caller_slot(
1040            handle,
1041            caller_slot.clone(),
1042            config.operation_store,
1043        );
1044        peer_metadata.push(vox_types::MetadataEntry::str(
1045            VOX_SERVICE_METADATA_KEY,
1046            Client::SERVICE_NAME,
1047        ));
1048        let request = super::ConnectionRequest::new(&peer_metadata)?;
1049        tracing::debug!(
1050            service = Client::SERVICE_NAME,
1051            "vox root connection routing to acceptor"
1052        );
1053        match acceptor.accept(&request, pending) {
1054            Ok(()) => tracing::debug!(
1055                service = Client::SERVICE_NAME,
1056                "vox root connection accepted"
1057            ),
1058            Err(metadata) => {
1059                tracing::debug!(
1060                    service = Client::SERVICE_NAME,
1061                    metadata_len = metadata.len(),
1062                    "vox root connection rejected"
1063                );
1064                return Err(SessionError::Rejected(metadata));
1065            }
1066        }
1067        let caller =
1068            caller_slot.lock().unwrap().take().expect(
1069                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1070            );
1071        let client = Client::from_vox_session(caller, Some(session_handle));
1072        (config.spawn_fn)(Box::pin(async move { session.run().await }));
1073        Ok(client)
1074    }
1075
1076    #[moire::instrument]
1077    pub async fn establish_or_resume<Client: FromVoxSession>(
1078        self,
1079    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1080    where
1081        C: Conduit<Msg = MessageFamily> + 'static,
1082        C::Tx: MaybeSend + MaybeSync + 'static,
1083        C::Rx: MaybeSend + 'static,
1084    {
1085        // With the CBOR handshake, resume detection happens at the link level
1086        // before the conduit is created. If the peer sent a resume key in the Hello
1087        // that matches a known session, we resume. Otherwise, we establish.
1088        if let (Some(registry), Some(resume_key)) = (
1089            &self.config.session_registry,
1090            self.handshake_result.peer_resume_key,
1091        ) && let Some(handle) = registry.get(&resume_key)
1092        {
1093            let (tx, rx) = self.conduit.split();
1094            if let Err(error) = handle
1095                .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1096                .await
1097            {
1098                registry.remove(&resume_key);
1099                return Err(error);
1100            }
1101            return Ok(SessionAcceptOutcome::Resumed);
1102        }
1103
1104        let client = self.establish().await?;
1105        Ok(SessionAcceptOutcome::Established(client))
1106    }
1107}
1108
1109pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1110    link: L,
1111    config: SessionConfig<'a>,
1112}
1113
1114impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1115    fn new(link: L) -> Self {
1116        Self {
1117            link,
1118            config: SessionConfig::with_settings(ConnectionSettings {
1119                parity: Parity::Even,
1120                max_concurrent_requests: 64,
1121                initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
1122            }),
1123        }
1124    }
1125
1126    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1127        self.config.root_settings = settings;
1128        self
1129    }
1130
1131    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1132        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1133        self
1134    }
1135
1136    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
1137        self.config.root_settings.initial_channel_credit = channel_capacity;
1138        self
1139    }
1140
1141    // r[impl rpc.observability.runtime]
1142    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
1143        self.config.observer = Some(Arc::new(observer));
1144        self
1145    }
1146
1147    // r[impl rpc.observability.runtime]
1148    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
1149        self.config.observer = Some(observer);
1150        self
1151    }
1152
1153    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1154        self.config.metadata = metadata;
1155        self
1156    }
1157
1158    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1159        self.config.on_connection = Some(Arc::new(acceptor));
1160        self
1161    }
1162
1163    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1164        self.config.keepalive = Some(keepalive);
1165        self
1166    }
1167
1168    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1169        self.config.connect_timeout = Some(timeout);
1170        self
1171    }
1172
1173    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1174        self.config.recovery_timeout = Some(timeout);
1175        self
1176    }
1177
1178    pub fn resumable(mut self) -> Self {
1179        self.config.resumable = true;
1180        self
1181    }
1182
1183    /// Disable session resumability. Useful for IPC transports where
1184    /// the peer is a process: when the process exits the connection
1185    /// is gone for good, there's nothing to resume against, and
1186    /// keeping the session alive in recovery mode just leaks
1187    /// per-channel state (e.g. server-side `Tx<T>`s never observing
1188    /// that the client disconnected).
1189    pub fn non_resumable(mut self) -> Self {
1190        self.config.resumable = false;
1191        self
1192    }
1193
1194    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1195        self.config.session_registry = Some(session_registry);
1196        self
1197    }
1198
1199    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1200        self.config.operation_store = Some(operation_store);
1201        self
1202    }
1203
1204    #[cfg(not(target_arch = "wasm32"))]
1205    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1206        self.config.spawn_fn = Box::new(f);
1207        self
1208    }
1209
1210    #[cfg(target_arch = "wasm32")]
1211    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1212        self.config.spawn_fn = Box::new(f);
1213        self
1214    }
1215
1216    #[moire::instrument]
1217    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1218    where
1219        L: Link + MaybeSend + 'static,
1220        L::Tx: MaybeSend + MaybeSync + 'static,
1221        L::Rx: MaybeSend + 'static,
1222    {
1223        let Self { link, mut config } = self;
1224        inject_service_metadata::<Client>(&mut config.metadata);
1225        let (mode, mut link) = accept_transport(link)
1226            .await
1227            .map_err(session_error_from_transport)?;
1228        match mode {
1229            TransportMode::Bare => {
1230                let handshake_result = handshake_as_acceptor(
1231                    &link.tx,
1232                    &mut link.rx,
1233                    config.root_settings.clone(),
1234                    true,
1235                    config.resumable,
1236                    None,
1237                    metadata_into_owned(config.metadata.clone()),
1238                )
1239                .await
1240                .map_err(session_error_from_handshake)?;
1241                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1242                    .map_err(SessionError::Protocol)?;
1243                let builder = SessionAcceptorBuilder::new(
1244                    BareConduit::with_message_plan(link, message_plan),
1245                    handshake_result,
1246                );
1247                Self::apply_common_parts(builder, config).establish().await
1248            }
1249        }
1250    }
1251
1252    #[moire::instrument]
1253    pub async fn establish_or_resume<Client: FromVoxSession>(
1254        self,
1255    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1256    where
1257        L: Link + MaybeSend + 'static,
1258        L::Tx: MaybeSend + MaybeSync + 'static,
1259        L::Rx: MaybeSend + 'static,
1260    {
1261        let Self { link, config } = self;
1262        let (mode, mut link) = accept_transport(link)
1263            .await
1264            .map_err(session_error_from_transport)?;
1265        match mode {
1266            TransportMode::Bare => {
1267                let handshake_result = handshake_as_acceptor(
1268                    &link.tx,
1269                    &mut link.rx,
1270                    config.root_settings.clone(),
1271                    true,
1272                    config.resumable,
1273                    None,
1274                    metadata_into_owned(config.metadata.clone()),
1275                )
1276                .await
1277                .map_err(session_error_from_handshake)?;
1278                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1279                    .map_err(SessionError::Protocol)?;
1280                let builder = SessionAcceptorBuilder::new(
1281                    BareConduit::with_message_plan(link, message_plan),
1282                    handshake_result,
1283                );
1284                Self::apply_common_parts(builder, config)
1285                    .establish_or_resume()
1286                    .await
1287            }
1288        }
1289    }
1290
1291    fn apply_common_parts<C>(
1292        mut builder: SessionAcceptorBuilder<'a, C>,
1293        config: SessionConfig<'a>,
1294    ) -> SessionAcceptorBuilder<'a, C> {
1295        builder.config = config;
1296        builder
1297    }
1298}
1299
1300fn validate_negotiated_root_settings(
1301    expected_root_settings: &ConnectionSettings,
1302    handshake_result: &HandshakeResult,
1303) -> Result<(), SessionError> {
1304    if expected_root_settings.initial_channel_credit == 0
1305        || handshake_result.peer_settings.initial_channel_credit == 0
1306    {
1307        return Err(SessionError::Protocol(
1308            "initial_channel_credit must be greater than zero".into(),
1309        ));
1310    }
1311
1312    if handshake_result.our_settings != *expected_root_settings {
1313        return Err(SessionError::Protocol(
1314            "negotiated root settings do not match builder settings".into(),
1315        ));
1316    }
1317    Ok(())
1318}
1319
1320fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1321    match error {
1322        crate::HandshakeError::Io(io) => SessionError::Io(io),
1323        crate::HandshakeError::PeerClosed => {
1324            SessionError::Protocol("peer closed during handshake".into())
1325        }
1326        crate::HandshakeError::NotResumable => SessionError::NotResumable,
1327        other => SessionError::Protocol(other.to_string()),
1328    }
1329}
1330
1331fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1332    match error {
1333        crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1334        crate::TransportPrologueError::LinkDead => {
1335            SessionError::Protocol("link closed during transport prologue".into())
1336        }
1337        crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1338        crate::TransportPrologueError::Rejected(reason) => {
1339            SessionError::Protocol(format!("transport rejected: {reason}"))
1340        }
1341    }
1342}