Skip to main content

vox_core/session/
builders.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12    Conduit, ConnectionSettings, DEFAULT_INITIAL_CHANNEL_CREDIT, HandshakeResult, Link, MaybeSend,
13    MaybeSync, MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink, VoxObserver,
14    VoxObserverHandle, metadata_into_owned,
15};
16
17use crate::LinkSource;
18use crate::{
19    BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
20    handshake_as_acceptor, handshake_as_initiator, initiate_transport,
21};
22
23use super::{
24    CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
25    SessionHandle, SessionKeepaliveConfig,
26};
27use crate::FromVoxSession;
28
29/// Well-known metadata key for service name routing.
30pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
31
32/// Inject `vox-service` metadata from `Client::SERVICE_NAME`.
33fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
34    metadata.push(vox_types::MetadataEntry {
35        key: VOX_SERVICE_METADATA_KEY.into(),
36        value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
37        flags: vox_types::MetadataFlags::NONE,
38    });
39}
40
41/// A pinned, boxed session future. On non-WASM this is `Send + 'static`;
42/// on WASM it's `'static` only (no `Send` requirement).
43#[cfg(not(target_arch = "wasm32"))]
44pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
45#[cfg(target_arch = "wasm32")]
46pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
47
48#[cfg(not(target_arch = "wasm32"))]
49type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
50#[cfg(target_arch = "wasm32")]
51type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
52
53#[cfg(not(target_arch = "wasm32"))]
54fn default_spawn_fn() -> SpawnFn {
55    Box::new(|fut| {
56        tokio::spawn(fut);
57    })
58}
59
60#[cfg(target_arch = "wasm32")]
61fn default_spawn_fn() -> SpawnFn {
62    Box::new(|fut| {
63        wasm_bindgen_futures::spawn_local(fut);
64    })
65}
66
67// r[impl rpc.session-setup]
68// r[impl session.role]
69pub fn initiator_conduit<I: IntoConduit>(
70    into_conduit: I,
71    handshake_result: HandshakeResult,
72) -> SessionInitiatorBuilder<'static, I::Conduit> {
73    SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
74}
75
76pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
77where
78    S: LinkSource,
79{
80    SessionSourceInitiatorBuilder::new(source, mode)
81}
82
83pub fn acceptor_conduit<I: IntoConduit>(
84    into_conduit: I,
85    handshake_result: HandshakeResult,
86) -> SessionAcceptorBuilder<'static, I::Conduit> {
87    SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
88}
89
90/// Convenience: perform CBOR handshake as initiator on a raw link, then return
91/// a builder with the conduit ready to go.
92pub async fn initiator_on_link<L: Link>(
93    link: L,
94    settings: ConnectionSettings,
95) -> Result<
96    SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
97    SessionError,
98>
99where
100    L::Tx: MaybeSend + MaybeSync + 'static,
101    L::Rx: MaybeSend + 'static,
102{
103    let (tx, mut rx) = link.split();
104    let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
105        .await
106        .map_err(session_error_from_handshake)?;
107    let message_plan =
108        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
109    Ok(SessionInitiatorBuilder::new(
110        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
111        handshake_result,
112    ))
113}
114
115/// Convenience: perform CBOR handshake as acceptor on a raw link, then return
116/// a builder with the conduit ready to go.
117pub async fn acceptor_on_link<L: Link>(
118    link: L,
119    settings: ConnectionSettings,
120) -> Result<
121    SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
122    SessionError,
123>
124where
125    L::Tx: MaybeSend + MaybeSync + 'static,
126    L::Rx: MaybeSend + 'static,
127{
128    let (tx, mut rx) = link.split();
129    let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
130        .await
131        .map_err(session_error_from_handshake)?;
132    let message_plan =
133        crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
134    Ok(SessionAcceptorBuilder::new(
135        BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
136        handshake_result,
137    ))
138}
139
140pub fn initiator_on<L: Link>(
141    link: L,
142    mode: TransportMode,
143) -> SessionTransportInitiatorBuilder<'static, L> {
144    SessionTransportInitiatorBuilder::new(link, mode)
145}
146
147pub fn initiator_transport<L: Link>(
148    link: L,
149    mode: TransportMode,
150) -> SessionTransportInitiatorBuilder<'static, L> {
151    initiator_on(link, mode)
152}
153
154pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
155    SessionTransportAcceptorBuilder::new(link)
156}
157
158pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
159    acceptor_on(link)
160}
161
162#[derive(Clone, Default)]
163pub struct SessionRegistry {
164    inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
165}
166
167impl SessionRegistry {
168    fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
169        self.inner
170            .lock()
171            .expect("session registry poisoned")
172            .get(key)
173            .cloned()
174    }
175
176    fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
177        self.inner
178            .lock()
179            .expect("session registry poisoned")
180            .insert(key, handle);
181    }
182
183    fn remove(&self, key: &SessionResumeKey) {
184        self.inner
185            .lock()
186            .expect("session registry poisoned")
187            .remove(key);
188    }
189}
190
191pub enum SessionAcceptOutcome<Client> {
192    Established(Client),
193    Resumed,
194}
195
196/// Shared configuration for all session builders.
197pub struct SessionConfig<'a> {
198    pub root_settings: ConnectionSettings,
199    pub metadata: Metadata<'a>,
200    pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
201    pub keepalive: Option<SessionKeepaliveConfig>,
202    pub resumable: bool,
203    pub session_registry: Option<SessionRegistry>,
204    pub operation_store: Option<Arc<dyn OperationStore>>,
205    pub spawn_fn: SpawnFn,
206    pub connect_timeout: Option<std::time::Duration>,
207    pub recovery_timeout: Option<std::time::Duration>,
208    pub observer: Option<VoxObserverHandle>,
209}
210
211impl SessionConfig<'_> {
212    fn with_settings(root_settings: ConnectionSettings) -> Self {
213        Self {
214            root_settings,
215            metadata: vec![],
216            on_connection: None,
217            keepalive: None,
218            resumable: true,
219            session_registry: None,
220            operation_store: None,
221            spawn_fn: default_spawn_fn(),
222            connect_timeout: None,
223            recovery_timeout: None,
224            observer: None,
225        }
226    }
227}
228
229impl Default for SessionConfig<'_> {
230    fn default() -> Self {
231        Self::with_settings(ConnectionSettings {
232            parity: Parity::Odd,
233            max_concurrent_requests: 64,
234            initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
235        })
236    }
237}
238
239pub struct SessionInitiatorBuilder<'a, C> {
240    conduit: C,
241    handshake_result: HandshakeResult,
242    config: SessionConfig<'a>,
243    recoverer: Option<Box<dyn ConduitRecoverer>>,
244}
245
246impl<'a, C> SessionInitiatorBuilder<'a, C> {
247    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
248        let root_settings = handshake_result.our_settings.clone();
249        let mut config = SessionConfig::with_settings(root_settings);
250        // Conduit builders default to non-resumable — callers opt in with .resumable()
251        config.resumable = false;
252        Self {
253            conduit,
254            handshake_result,
255            config,
256            recoverer: None,
257        }
258    }
259
260    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
261        self.config.on_connection = Some(Arc::new(acceptor));
262        self
263    }
264
265    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
266        self.config.keepalive = Some(keepalive);
267        self
268    }
269
270    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
271        self.config.root_settings.initial_channel_credit = channel_capacity;
272        self
273    }
274
275    // r[impl rpc.observability.runtime]
276    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
277        self.config.observer = Some(Arc::new(observer));
278        self
279    }
280
281    // r[impl rpc.observability.runtime]
282    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
283        self.config.observer = Some(observer);
284        self
285    }
286
287    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
288        self.config.connect_timeout = Some(timeout);
289        self
290    }
291
292    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
293        self.config.recovery_timeout = Some(timeout);
294        self
295    }
296
297    pub fn resumable(mut self) -> Self {
298        self.config.resumable = true;
299        self
300    }
301
302    /// Disable session resumability. Useful for IPC transports where
303    /// the peer is a process: when the process exits the connection
304    /// is gone for good, there's nothing to resume against, and
305    /// keeping the session alive in recovery mode just leaks
306    /// per-channel state (e.g. server-side `Tx<T>`s never observing
307    /// that the client disconnected).
308    pub fn non_resumable(mut self) -> Self {
309        self.config.resumable = false;
310        self
311    }
312
313    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
314        self.config.operation_store = Some(operation_store);
315        self
316    }
317
318    /// Override the function used to spawn the session background task.
319    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
320    #[cfg(not(target_arch = "wasm32"))]
321    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
322        self.config.spawn_fn = Box::new(f);
323        self
324    }
325
326    /// Override the function used to spawn the session background task.
327    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
328    #[cfg(target_arch = "wasm32")]
329    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
330        self.config.spawn_fn = Box::new(f);
331        self
332    }
333
334    /// Establish a session using the given settings, on the given link source, etc,
335    ///
336    ///   - requiring (as an arg) a handler for the service the local peer will serve
337    ///   - returning a caller for the service we expect the remote peer to serve
338    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
339    where
340        C: Conduit<Msg = MessageFamily> + 'static,
341        C::Tx: MaybeSend + MaybeSync + 'static,
342        C::Rx: MaybeSend + 'static,
343    {
344        let Self {
345            conduit,
346            mut handshake_result,
347            config,
348            recoverer,
349        } = self;
350        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
351        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
352        let (tx, rx) = conduit.split();
353        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
354        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
355        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
356        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
357        let acceptor: Arc<dyn ConnectionAcceptor> =
358            config.on_connection.unwrap_or_else(|| Arc::new(()));
359        let mut session = Session::pre_handshake(
360            tx,
361            rx,
362            Some(acceptor.clone()),
363            open_rx,
364            close_rx,
365            resume_rx,
366            control_tx.clone(),
367            control_rx,
368            config.keepalive,
369            config.resumable,
370            recoverer,
371            config.recovery_timeout,
372            config.observer.clone(),
373        );
374        let handle = session.establish_from_handshake(handshake_result)?;
375        let resume_key = session.resume_key();
376        let session_handle = SessionHandle {
377            open_tx,
378            close_tx,
379            resume_tx,
380            control_tx,
381            resume_key,
382        };
383        // Route the root connection through the acceptor.
384        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
385        let pending = super::PendingConnection::with_caller_slot(
386            handle,
387            caller_slot.clone(),
388            config.operation_store,
389        );
390        peer_metadata.push(vox_types::MetadataEntry::str(
391            VOX_SERVICE_METADATA_KEY,
392            Client::SERVICE_NAME,
393        ));
394        let request = super::ConnectionRequest::new(&peer_metadata)?;
395        acceptor
396            .accept(&request, pending)
397            .map_err(SessionError::Rejected)?;
398        let caller =
399            caller_slot.lock().unwrap().take().expect(
400                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
401            );
402        let client = Client::from_vox_session(caller, Some(session_handle));
403        (config.spawn_fn)(Box::pin(async move { session.run().await }));
404        Ok(client)
405    }
406}
407
408pub struct SessionSourceInitiatorBuilder<'a, S> {
409    source: S,
410    mode: TransportMode,
411    config: SessionConfig<'a>,
412}
413
414impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
415    fn new(source: S, mode: TransportMode) -> Self {
416        let config = SessionConfig {
417            resumable: false,
418            ..SessionConfig::default()
419        };
420        Self {
421            source,
422            mode,
423            config,
424        }
425    }
426
427    pub fn parity(mut self, parity: Parity) -> Self {
428        self.config.root_settings.parity = parity;
429        self
430    }
431
432    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
433        self.config.root_settings = settings;
434        self
435    }
436
437    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
438        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
439        self
440    }
441
442    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
443        self.config.root_settings.initial_channel_credit = channel_capacity;
444        self
445    }
446
447    // r[impl rpc.observability.runtime]
448    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
449        self.config.observer = Some(Arc::new(observer));
450        self
451    }
452
453    // r[impl rpc.observability.runtime]
454    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
455        self.config.observer = Some(observer);
456        self
457    }
458
459    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
460        self.config.metadata = metadata;
461        self
462    }
463
464    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
465        self.config.on_connection = Some(Arc::new(acceptor));
466        self
467    }
468
469    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
470        self.config.keepalive = Some(keepalive);
471        self
472    }
473
474    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
475        self.config.connect_timeout = Some(timeout);
476        self
477    }
478
479    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
480        self.config.recovery_timeout = Some(timeout);
481        self
482    }
483
484    pub fn resumable(mut self) -> Self {
485        self.config.resumable = true;
486        self
487    }
488
489    /// Disable session resumability. Useful for IPC transports where
490    /// the peer is a process: when the process exits the connection
491    /// is gone for good, there's nothing to resume against, and
492    /// keeping the session alive in recovery mode just leaks
493    /// per-channel state (e.g. server-side `Tx<T>`s never observing
494    /// that the client disconnected).
495    pub fn non_resumable(mut self) -> Self {
496        self.config.resumable = false;
497        self
498    }
499
500    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
501        self.config.operation_store = Some(operation_store);
502        self
503    }
504
505    #[cfg(not(target_arch = "wasm32"))]
506    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
507        self.config.spawn_fn = Box::new(f);
508        self
509    }
510
511    #[cfg(target_arch = "wasm32")]
512    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
513        self.config.spawn_fn = Box::new(f);
514        self
515    }
516
517    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
518    where
519        S: LinkSource,
520        S::Link: Link + MaybeSend + 'static,
521        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
522        <S::Link as Link>::Rx: MaybeSend + 'static,
523    {
524        let connect_timeout = self.config.connect_timeout;
525        let fut = self.establish_inner::<Client>();
526        match connect_timeout {
527            Some(timeout) => time::timeout(timeout, fut)
528                .await
529                .map_err(|_| SessionError::ConnectTimeout)?,
530            None => fut.await,
531        }
532    }
533
534    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
535    where
536        S: LinkSource,
537        S::Link: Link + MaybeSend + 'static,
538        <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
539        <S::Link as Link>::Rx: MaybeSend + 'static,
540    {
541        let Self {
542            mut source,
543            mode,
544            mut config,
545        } = self;
546        inject_service_metadata::<Client>(&mut config.metadata);
547        let _ = mode;
548
549        {
550            {
551                let attachment = source.next_link().await.map_err(SessionError::Io)?;
552                let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
553                    .await
554                    .map_err(session_error_from_transport)?;
555                let handshake_result = handshake_as_initiator(
556                    &link.tx,
557                    &mut link.rx,
558                    config.root_settings.clone(),
559                    true,
560                    None,
561                    metadata_into_owned(config.metadata.clone()),
562                )
563                .await
564                .map_err(session_error_from_handshake)?;
565                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
566                    .map_err(SessionError::Protocol)?;
567                let builder = SessionInitiatorBuilder::new(
568                    BareConduit::with_message_plan(link, message_plan),
569                    handshake_result,
570                );
571                let recoverer = Box::new(BareSourceRecoverer {
572                    source,
573                    settings: config.root_settings.clone(),
574                    connect_timeout: config.connect_timeout,
575                    metadata: metadata_into_owned(config.metadata.clone()),
576                });
577                SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
578                    builder,
579                    config,
580                    Some(recoverer),
581                )
582                .establish()
583                .await
584            }
585        }
586    }
587}
588
589pub struct SessionTransportInitiatorBuilder<'a, L> {
590    link: L,
591    mode: TransportMode,
592    config: SessionConfig<'a>,
593}
594
595impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
596    fn new(link: L, mode: TransportMode) -> Self {
597        let config = SessionConfig {
598            resumable: false,
599            ..SessionConfig::default()
600        };
601        Self { link, mode, config }
602    }
603
604    pub fn parity(mut self, parity: Parity) -> Self {
605        self.config.root_settings.parity = parity;
606        self
607    }
608
609    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
610        self.config.root_settings = settings;
611        self
612    }
613
614    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
615        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
616        self
617    }
618
619    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
620        self.config.root_settings.initial_channel_credit = channel_capacity;
621        self
622    }
623
624    // r[impl rpc.observability.runtime]
625    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
626        self.config.observer = Some(Arc::new(observer));
627        self
628    }
629
630    // r[impl rpc.observability.runtime]
631    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
632        self.config.observer = Some(observer);
633        self
634    }
635
636    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
637        self.config.metadata = metadata;
638        self
639    }
640
641    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
642        self.config.on_connection = Some(Arc::new(acceptor));
643        self
644    }
645
646    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
647        self.config.keepalive = Some(keepalive);
648        self
649    }
650
651    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
652        self.config.connect_timeout = Some(timeout);
653        self
654    }
655
656    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
657        self.config.recovery_timeout = Some(timeout);
658        self
659    }
660
661    pub fn resumable(mut self) -> Self {
662        self.config.resumable = true;
663        self
664    }
665
666    /// Disable session resumability. Useful for IPC transports where
667    /// the peer is a process: when the process exits the connection
668    /// is gone for good, there's nothing to resume against, and
669    /// keeping the session alive in recovery mode just leaks
670    /// per-channel state (e.g. server-side `Tx<T>`s never observing
671    /// that the client disconnected).
672    pub fn non_resumable(mut self) -> Self {
673        self.config.resumable = false;
674        self
675    }
676
677    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
678        self.config.operation_store = Some(operation_store);
679        self
680    }
681
682    #[cfg(not(target_arch = "wasm32"))]
683    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
684        self.config.spawn_fn = Box::new(f);
685        self
686    }
687
688    #[cfg(target_arch = "wasm32")]
689    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
690        self.config.spawn_fn = Box::new(f);
691        self
692    }
693
694    #[cfg(not(target_arch = "wasm32"))]
695    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
696    where
697        L: Link + Send + 'static,
698        L::Tx: MaybeSend + MaybeSync + 'static,
699        L::Rx: MaybeSend + 'static,
700    {
701        let connect_timeout = self.config.connect_timeout;
702        let fut = self.establish_inner::<Client>();
703        match connect_timeout {
704            Some(timeout) => vox_types::time::tokio::timeout(timeout, fut)
705                .await
706                .map_err(|_| SessionError::ConnectTimeout)?,
707            None => fut.await,
708        }
709    }
710
711    #[cfg(not(target_arch = "wasm32"))]
712    async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
713    where
714        L: Link + Send + 'static,
715        L::Tx: MaybeSend + MaybeSync + 'static,
716        L::Rx: MaybeSend + 'static,
717    {
718        let Self {
719            link,
720            mode,
721            mut config,
722        } = self;
723        inject_service_metadata::<Client>(&mut config.metadata);
724        let _ = mode;
725        let link = initiate_transport(link, TransportMode::Bare)
726            .await
727            .map_err(session_error_from_transport)?;
728        Self::finish_with_bare_parts(link, config).await
729    }
730
731    #[cfg(target_arch = "wasm32")]
732    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
733    where
734        L: Link + 'static,
735        L::Tx: MaybeSend + MaybeSync + 'static,
736        L::Rx: MaybeSend + 'static,
737    {
738        let Self {
739            link,
740            mode,
741            mut config,
742        } = self;
743        inject_service_metadata::<Client>(&mut config.metadata);
744        match mode {
745            TransportMode::Bare => {
746                let link = initiate_transport(link, TransportMode::Bare)
747                    .await
748                    .map_err(session_error_from_transport)?;
749                Self::finish_with_bare_parts(link, config).await
750            }
751        }
752    }
753
754    async fn finish_with_bare_parts<Client: FromVoxSession>(
755        mut link: SplitLink<L::Tx, L::Rx>,
756        config: SessionConfig<'a>,
757    ) -> Result<Client, SessionError>
758    where
759        L: Link + 'static,
760        L::Tx: MaybeSend + MaybeSync + 'static,
761        L::Rx: MaybeSend + 'static,
762    {
763        let handshake_result = handshake_as_initiator(
764            &link.tx,
765            &mut link.rx,
766            config.root_settings.clone(),
767            true,
768            None,
769            metadata_into_owned(config.metadata.clone()),
770        )
771        .await
772        .map_err(session_error_from_handshake)?;
773        let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
774            .map_err(SessionError::Protocol)?;
775        let builder = SessionInitiatorBuilder::new(
776            BareConduit::with_message_plan(link, message_plan),
777            handshake_result,
778        );
779        Self::apply_common_parts(builder, config, None)
780            .establish()
781            .await
782    }
783
784    #[allow(clippy::too_many_arguments)]
785    fn apply_common_parts<C>(
786        mut builder: SessionInitiatorBuilder<'a, C>,
787        config: SessionConfig<'a>,
788        recoverer: Option<Box<dyn ConduitRecoverer>>,
789    ) -> SessionInitiatorBuilder<'a, C> {
790        builder.config = config;
791        builder.recoverer = recoverer;
792        builder
793    }
794}
795
796struct BareSourceRecoverer<S> {
797    source: S,
798    settings: ConnectionSettings,
799    connect_timeout: Option<Duration>,
800    metadata: Metadata<'static>,
801}
802
803const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
804const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
805
806impl<S> ConduitRecoverer for BareSourceRecoverer<S>
807where
808    S: LinkSource,
809    S::Link: Link + MaybeSend + 'static,
810    <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
811    <S::Link as Link>::Rx: MaybeSend + 'static,
812{
813    fn next_conduit<'a>(
814        &'a mut self,
815        resume_key: Option<&'a SessionResumeKey>,
816    ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
817        Box::pin(async move {
818            let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
819            let mut use_resume_key = resume_key.is_some();
820
821            loop {
822                let selected_resume_key = if use_resume_key { resume_key } else { None };
823
824                let attempt = async {
825                    let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
826                    let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
827                        .await
828                        .map_err(session_error_from_transport)?;
829                    let handshake_result = handshake_as_initiator(
830                        &link.tx,
831                        &mut link.rx,
832                        self.settings.clone(),
833                        true,
834                        selected_resume_key,
835                        metadata_into_owned(self.metadata.clone()),
836                    )
837                    .await
838                    .map_err(session_error_from_handshake)?;
839                    let conduit = BareConduit::<MessageFamily, _>::new(link);
840                    let (tx, rx) = conduit.split();
841                    Ok(super::RecoveredConduit {
842                        tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
843                        rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
844                        handshake: handshake_result,
845                    })
846                };
847
848                let result = match self.connect_timeout {
849                    Some(timeout) => match time::timeout(timeout, attempt).await {
850                        Ok(r) => r,
851                        Err(_) => Err(SessionError::ConnectTimeout),
852                    },
853                    None => attempt.await,
854                };
855
856                match result {
857                    Ok(conduit) => return Ok(conduit),
858                    Err(e) if !e.is_retryable() => return Err(e),
859                    Err(_) => {}
860                }
861
862                if use_resume_key {
863                    // If a resumption attempt is rejected once, continue trying without
864                    // a resume key so restart scenarios can establish a fresh session.
865                    use_resume_key = false;
866                }
867
868                time::sleep(backoff).await;
869                backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
870            }
871        })
872    }
873}
874
875pub struct SessionAcceptorBuilder<'a, C> {
876    conduit: C,
877    handshake_result: HandshakeResult,
878    config: SessionConfig<'a>,
879}
880
881impl<'a, C> SessionAcceptorBuilder<'a, C> {
882    fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
883        let root_settings = handshake_result.our_settings.clone();
884        let mut config = SessionConfig::with_settings(root_settings);
885        // Conduit builders default to non-resumable — callers opt in with .resumable()
886        config.resumable = false;
887        Self {
888            conduit,
889            handshake_result,
890            config,
891        }
892    }
893
894    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
895        self.config.on_connection = Some(Arc::new(acceptor));
896        self
897    }
898
899    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
900        self.config.keepalive = Some(keepalive);
901        self
902    }
903
904    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
905        self.config.root_settings.initial_channel_credit = channel_capacity;
906        self
907    }
908
909    // r[impl rpc.observability.runtime]
910    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
911        self.config.observer = Some(Arc::new(observer));
912        self
913    }
914
915    // r[impl rpc.observability.runtime]
916    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
917        self.config.observer = Some(observer);
918        self
919    }
920
921    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
922        self.config.connect_timeout = Some(timeout);
923        self
924    }
925
926    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
927        self.config.recovery_timeout = Some(timeout);
928        self
929    }
930
931    pub fn resumable(mut self) -> Self {
932        self.config.resumable = true;
933        self
934    }
935
936    /// Disable session resumability. Useful for IPC transports where
937    /// the peer is a process: when the process exits the connection
938    /// is gone for good, there's nothing to resume against, and
939    /// keeping the session alive in recovery mode just leaks
940    /// per-channel state (e.g. server-side `Tx<T>`s never observing
941    /// that the client disconnected).
942    pub fn non_resumable(mut self) -> Self {
943        self.config.resumable = false;
944        self
945    }
946
947    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
948        self.config.session_registry = Some(session_registry);
949        self
950    }
951
952    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
953        self.config.operation_store = Some(operation_store);
954        self
955    }
956
957    /// Override the function used to spawn the session background task.
958    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
959    #[cfg(not(target_arch = "wasm32"))]
960    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
961        self.config.spawn_fn = Box::new(f);
962        self
963    }
964
965    /// Override the function used to spawn the session background task.
966    /// Defaults to `tokio::spawn` on non-WASM and `wasm_bindgen_futures::spawn_local` on WASM.
967    #[cfg(target_arch = "wasm32")]
968    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
969        self.config.spawn_fn = Box::new(f);
970        self
971    }
972
973    #[moire::instrument]
974    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
975    where
976        C: Conduit<Msg = MessageFamily> + 'static,
977        C::Tx: MaybeSend + MaybeSync + 'static,
978        C::Rx: MaybeSend + 'static,
979    {
980        let Self {
981            conduit,
982            mut handshake_result,
983            config,
984        } = self;
985        validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
986        let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
987        let (tx, rx) = conduit.split();
988        let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
989        let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
990        let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
991        let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
992        let acceptor: Arc<dyn ConnectionAcceptor> =
993            config.on_connection.unwrap_or_else(|| Arc::new(()));
994        let mut session = Session::pre_handshake(
995            tx,
996            rx,
997            Some(acceptor.clone()),
998            open_rx,
999            close_rx,
1000            resume_rx,
1001            control_tx.clone(),
1002            control_rx,
1003            config.keepalive,
1004            config.resumable,
1005            None,
1006            config.recovery_timeout,
1007            config.observer.clone(),
1008        );
1009        let handle = session.establish_from_handshake(handshake_result)?;
1010        let resume_key = session.resume_key();
1011        let session_handle = SessionHandle {
1012            open_tx,
1013            close_tx,
1014            resume_tx,
1015            control_tx,
1016            resume_key,
1017        };
1018        if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1019            registry.insert(key, session_handle.clone());
1020            session.registered_in_registry = true;
1021        }
1022        // Route the root connection through the acceptor.
1023        let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1024        let pending = super::PendingConnection::with_caller_slot(
1025            handle,
1026            caller_slot.clone(),
1027            config.operation_store,
1028        );
1029        peer_metadata.push(vox_types::MetadataEntry::str(
1030            VOX_SERVICE_METADATA_KEY,
1031            Client::SERVICE_NAME,
1032        ));
1033        let request = super::ConnectionRequest::new(&peer_metadata)?;
1034        acceptor
1035            .accept(&request, pending)
1036            .map_err(SessionError::Rejected)?;
1037        let caller =
1038            caller_slot.lock().unwrap().take().expect(
1039                "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1040            );
1041        let client = Client::from_vox_session(caller, Some(session_handle));
1042        (config.spawn_fn)(Box::pin(async move { session.run().await }));
1043        Ok(client)
1044    }
1045
1046    #[moire::instrument]
1047    pub async fn establish_or_resume<Client: FromVoxSession>(
1048        self,
1049    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1050    where
1051        C: Conduit<Msg = MessageFamily> + 'static,
1052        C::Tx: MaybeSend + MaybeSync + 'static,
1053        C::Rx: MaybeSend + 'static,
1054    {
1055        // With the CBOR handshake, resume detection happens at the link level
1056        // before the conduit is created. If the peer sent a resume key in the Hello
1057        // that matches a known session, we resume. Otherwise, we establish.
1058        if let (Some(registry), Some(resume_key)) = (
1059            &self.config.session_registry,
1060            self.handshake_result.peer_resume_key,
1061        ) && let Some(handle) = registry.get(&resume_key)
1062        {
1063            let (tx, rx) = self.conduit.split();
1064            if let Err(error) = handle
1065                .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1066                .await
1067            {
1068                registry.remove(&resume_key);
1069                return Err(error);
1070            }
1071            return Ok(SessionAcceptOutcome::Resumed);
1072        }
1073
1074        let client = self.establish().await?;
1075        Ok(SessionAcceptOutcome::Established(client))
1076    }
1077}
1078
1079pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1080    link: L,
1081    config: SessionConfig<'a>,
1082}
1083
1084impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1085    fn new(link: L) -> Self {
1086        Self {
1087            link,
1088            config: SessionConfig::with_settings(ConnectionSettings {
1089                parity: Parity::Even,
1090                max_concurrent_requests: 64,
1091                initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
1092            }),
1093        }
1094    }
1095
1096    pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1097        self.config.root_settings = settings;
1098        self
1099    }
1100
1101    pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1102        self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1103        self
1104    }
1105
1106    pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
1107        self.config.root_settings.initial_channel_credit = channel_capacity;
1108        self
1109    }
1110
1111    // r[impl rpc.observability.runtime]
1112    pub fn observer(mut self, observer: impl VoxObserver) -> Self {
1113        self.config.observer = Some(Arc::new(observer));
1114        self
1115    }
1116
1117    // r[impl rpc.observability.runtime]
1118    pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
1119        self.config.observer = Some(observer);
1120        self
1121    }
1122
1123    pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1124        self.config.metadata = metadata;
1125        self
1126    }
1127
1128    pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1129        self.config.on_connection = Some(Arc::new(acceptor));
1130        self
1131    }
1132
1133    pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1134        self.config.keepalive = Some(keepalive);
1135        self
1136    }
1137
1138    pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1139        self.config.connect_timeout = Some(timeout);
1140        self
1141    }
1142
1143    pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1144        self.config.recovery_timeout = Some(timeout);
1145        self
1146    }
1147
1148    pub fn resumable(mut self) -> Self {
1149        self.config.resumable = true;
1150        self
1151    }
1152
1153    /// Disable session resumability. Useful for IPC transports where
1154    /// the peer is a process: when the process exits the connection
1155    /// is gone for good, there's nothing to resume against, and
1156    /// keeping the session alive in recovery mode just leaks
1157    /// per-channel state (e.g. server-side `Tx<T>`s never observing
1158    /// that the client disconnected).
1159    pub fn non_resumable(mut self) -> Self {
1160        self.config.resumable = false;
1161        self
1162    }
1163
1164    pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1165        self.config.session_registry = Some(session_registry);
1166        self
1167    }
1168
1169    pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1170        self.config.operation_store = Some(operation_store);
1171        self
1172    }
1173
1174    #[cfg(not(target_arch = "wasm32"))]
1175    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1176        self.config.spawn_fn = Box::new(f);
1177        self
1178    }
1179
1180    #[cfg(target_arch = "wasm32")]
1181    pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1182        self.config.spawn_fn = Box::new(f);
1183        self
1184    }
1185
1186    #[moire::instrument]
1187    pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1188    where
1189        L: Link + MaybeSend + 'static,
1190        L::Tx: MaybeSend + MaybeSync + 'static,
1191        L::Rx: MaybeSend + 'static,
1192    {
1193        let Self { link, mut config } = self;
1194        inject_service_metadata::<Client>(&mut config.metadata);
1195        let (mode, mut link) = accept_transport(link)
1196            .await
1197            .map_err(session_error_from_transport)?;
1198        match mode {
1199            TransportMode::Bare => {
1200                let handshake_result = handshake_as_acceptor(
1201                    &link.tx,
1202                    &mut link.rx,
1203                    config.root_settings.clone(),
1204                    true,
1205                    config.resumable,
1206                    None,
1207                    metadata_into_owned(config.metadata.clone()),
1208                )
1209                .await
1210                .map_err(session_error_from_handshake)?;
1211                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1212                    .map_err(SessionError::Protocol)?;
1213                let builder = SessionAcceptorBuilder::new(
1214                    BareConduit::with_message_plan(link, message_plan),
1215                    handshake_result,
1216                );
1217                Self::apply_common_parts(builder, config).establish().await
1218            }
1219        }
1220    }
1221
1222    #[moire::instrument]
1223    pub async fn establish_or_resume<Client: FromVoxSession>(
1224        self,
1225    ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1226    where
1227        L: Link + MaybeSend + 'static,
1228        L::Tx: MaybeSend + MaybeSync + 'static,
1229        L::Rx: MaybeSend + 'static,
1230    {
1231        let Self { link, config } = self;
1232        let (mode, mut link) = accept_transport(link)
1233            .await
1234            .map_err(session_error_from_transport)?;
1235        match mode {
1236            TransportMode::Bare => {
1237                let handshake_result = handshake_as_acceptor(
1238                    &link.tx,
1239                    &mut link.rx,
1240                    config.root_settings.clone(),
1241                    true,
1242                    config.resumable,
1243                    None,
1244                    metadata_into_owned(config.metadata.clone()),
1245                )
1246                .await
1247                .map_err(session_error_from_handshake)?;
1248                let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1249                    .map_err(SessionError::Protocol)?;
1250                let builder = SessionAcceptorBuilder::new(
1251                    BareConduit::with_message_plan(link, message_plan),
1252                    handshake_result,
1253                );
1254                Self::apply_common_parts(builder, config)
1255                    .establish_or_resume()
1256                    .await
1257            }
1258        }
1259    }
1260
1261    fn apply_common_parts<C>(
1262        mut builder: SessionAcceptorBuilder<'a, C>,
1263        config: SessionConfig<'a>,
1264    ) -> SessionAcceptorBuilder<'a, C> {
1265        builder.config = config;
1266        builder
1267    }
1268}
1269
1270fn validate_negotiated_root_settings(
1271    expected_root_settings: &ConnectionSettings,
1272    handshake_result: &HandshakeResult,
1273) -> Result<(), SessionError> {
1274    if expected_root_settings.initial_channel_credit == 0
1275        || handshake_result.peer_settings.initial_channel_credit == 0
1276    {
1277        return Err(SessionError::Protocol(
1278            "initial_channel_credit must be greater than zero".into(),
1279        ));
1280    }
1281
1282    if handshake_result.our_settings != *expected_root_settings {
1283        return Err(SessionError::Protocol(
1284            "negotiated root settings do not match builder settings".into(),
1285        ));
1286    }
1287    Ok(())
1288}
1289
1290fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1291    match error {
1292        crate::HandshakeError::Io(io) => SessionError::Io(io),
1293        crate::HandshakeError::PeerClosed => {
1294            SessionError::Protocol("peer closed during handshake".into())
1295        }
1296        crate::HandshakeError::NotResumable => SessionError::NotResumable,
1297        other => SessionError::Protocol(other.to_string()),
1298    }
1299}
1300
1301fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1302    match error {
1303        crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1304        crate::TransportPrologueError::LinkDead => {
1305            SessionError::Protocol("link closed during transport prologue".into())
1306        }
1307        crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1308        crate::TransportPrologueError::Rejected(reason) => {
1309            SessionError::Protocol(format!("transport rejected: {reason}"))
1310        }
1311    }
1312}