1use std::{
2 collections::HashMap,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12 Conduit, ConnectionSettings, DEFAULT_INITIAL_CHANNEL_CREDIT, HandshakeResult, Link, MaybeSend,
13 MaybeSync, MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink, VoxObserver,
14 VoxObserverHandle, metadata_into_owned,
15};
16
17use crate::LinkSource;
18use crate::{
19 BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
20 handshake_as_acceptor, handshake_as_initiator, initiate_transport,
21};
22
23use super::{
24 CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
25 SessionHandle, SessionKeepaliveConfig,
26};
27use crate::FromVoxSession;
28
29pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
31
32fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
34 metadata.push(vox_types::MetadataEntry {
35 key: VOX_SERVICE_METADATA_KEY.into(),
36 value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
37 flags: vox_types::MetadataFlags::NONE,
38 });
39}
40
41#[cfg(not(target_arch = "wasm32"))]
44pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
45#[cfg(target_arch = "wasm32")]
46pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
47
48#[cfg(not(target_arch = "wasm32"))]
49type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
50#[cfg(target_arch = "wasm32")]
51type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
52
53#[cfg(not(target_arch = "wasm32"))]
54fn default_spawn_fn() -> SpawnFn {
55 Box::new(|fut| {
56 tokio::spawn(fut);
57 })
58}
59
60#[cfg(target_arch = "wasm32")]
61fn default_spawn_fn() -> SpawnFn {
62 Box::new(|fut| {
63 wasm_bindgen_futures::spawn_local(fut);
64 })
65}
66
67pub fn initiator_conduit<I: IntoConduit>(
70 into_conduit: I,
71 handshake_result: HandshakeResult,
72) -> SessionInitiatorBuilder<'static, I::Conduit> {
73 SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
74}
75
76pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
77where
78 S: LinkSource,
79{
80 SessionSourceInitiatorBuilder::new(source, mode)
81}
82
83pub fn acceptor_conduit<I: IntoConduit>(
84 into_conduit: I,
85 handshake_result: HandshakeResult,
86) -> SessionAcceptorBuilder<'static, I::Conduit> {
87 SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
88}
89
90pub async fn initiator_on_link<L: Link>(
93 link: L,
94 settings: ConnectionSettings,
95) -> Result<
96 SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
97 SessionError,
98>
99where
100 L::Tx: MaybeSend + MaybeSync + 'static,
101 L::Rx: MaybeSend + 'static,
102{
103 let (tx, mut rx) = link.split();
104 let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
105 .await
106 .map_err(session_error_from_handshake)?;
107 let message_plan =
108 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
109 Ok(SessionInitiatorBuilder::new(
110 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
111 handshake_result,
112 ))
113}
114
115pub async fn acceptor_on_link<L: Link>(
118 link: L,
119 settings: ConnectionSettings,
120) -> Result<
121 SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
122 SessionError,
123>
124where
125 L::Tx: MaybeSend + MaybeSync + 'static,
126 L::Rx: MaybeSend + 'static,
127{
128 let (tx, mut rx) = link.split();
129 let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
130 .await
131 .map_err(session_error_from_handshake)?;
132 let message_plan =
133 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
134 Ok(SessionAcceptorBuilder::new(
135 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
136 handshake_result,
137 ))
138}
139
140pub fn initiator_on<L: Link>(
141 link: L,
142 mode: TransportMode,
143) -> SessionTransportInitiatorBuilder<'static, L> {
144 SessionTransportInitiatorBuilder::new(link, mode)
145}
146
147pub fn initiator_transport<L: Link>(
148 link: L,
149 mode: TransportMode,
150) -> SessionTransportInitiatorBuilder<'static, L> {
151 initiator_on(link, mode)
152}
153
154pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
155 SessionTransportAcceptorBuilder::new(link)
156}
157
158pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
159 acceptor_on(link)
160}
161
162#[derive(Clone, Default)]
163pub struct SessionRegistry {
164 inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
165}
166
167impl SessionRegistry {
168 fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
169 self.inner
170 .lock()
171 .expect("session registry poisoned")
172 .get(key)
173 .cloned()
174 }
175
176 fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
177 self.inner
178 .lock()
179 .expect("session registry poisoned")
180 .insert(key, handle);
181 }
182
183 fn remove(&self, key: &SessionResumeKey) {
184 self.inner
185 .lock()
186 .expect("session registry poisoned")
187 .remove(key);
188 }
189}
190
191pub enum SessionAcceptOutcome<Client> {
192 Established(Client),
193 Resumed,
194}
195
196pub struct SessionConfig<'a> {
198 pub root_settings: ConnectionSettings,
199 pub metadata: Metadata<'a>,
200 pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
201 pub keepalive: Option<SessionKeepaliveConfig>,
202 pub resumable: bool,
203 pub session_registry: Option<SessionRegistry>,
204 pub operation_store: Option<Arc<dyn OperationStore>>,
205 pub spawn_fn: SpawnFn,
206 pub connect_timeout: Option<std::time::Duration>,
207 pub recovery_timeout: Option<std::time::Duration>,
208 pub observer: Option<VoxObserverHandle>,
209}
210
211impl SessionConfig<'_> {
212 fn with_settings(root_settings: ConnectionSettings) -> Self {
213 Self {
214 root_settings,
215 metadata: vec![],
216 on_connection: None,
217 keepalive: None,
218 resumable: true,
219 session_registry: None,
220 operation_store: None,
221 spawn_fn: default_spawn_fn(),
222 connect_timeout: None,
223 recovery_timeout: None,
224 observer: None,
225 }
226 }
227}
228
229impl Default for SessionConfig<'_> {
230 fn default() -> Self {
231 Self::with_settings(ConnectionSettings {
232 parity: Parity::Odd,
233 max_concurrent_requests: 64,
234 initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
235 })
236 }
237}
238
239pub struct SessionInitiatorBuilder<'a, C> {
240 conduit: C,
241 handshake_result: HandshakeResult,
242 config: SessionConfig<'a>,
243 recoverer: Option<Box<dyn ConduitRecoverer>>,
244}
245
246impl<'a, C> SessionInitiatorBuilder<'a, C> {
247 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
248 let root_settings = handshake_result.our_settings.clone();
249 let mut config = SessionConfig::with_settings(root_settings);
250 config.resumable = false;
252 Self {
253 conduit,
254 handshake_result,
255 config,
256 recoverer: None,
257 }
258 }
259
260 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
261 self.config.on_connection = Some(Arc::new(acceptor));
262 self
263 }
264
265 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
266 self.config.keepalive = Some(keepalive);
267 self
268 }
269
270 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
271 self.config.root_settings.initial_channel_credit = channel_capacity;
272 self
273 }
274
275 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
277 self.config.observer = Some(Arc::new(observer));
278 self
279 }
280
281 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
283 self.config.observer = Some(observer);
284 self
285 }
286
287 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
288 self.config.connect_timeout = Some(timeout);
289 self
290 }
291
292 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
293 self.config.recovery_timeout = Some(timeout);
294 self
295 }
296
297 pub fn resumable(mut self) -> Self {
298 self.config.resumable = true;
299 self
300 }
301
302 pub fn non_resumable(mut self) -> Self {
309 self.config.resumable = false;
310 self
311 }
312
313 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
314 self.config.operation_store = Some(operation_store);
315 self
316 }
317
318 #[cfg(not(target_arch = "wasm32"))]
321 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
322 self.config.spawn_fn = Box::new(f);
323 self
324 }
325
326 #[cfg(target_arch = "wasm32")]
329 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
330 self.config.spawn_fn = Box::new(f);
331 self
332 }
333
334 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
339 where
340 C: Conduit<Msg = MessageFamily> + 'static,
341 C::Tx: MaybeSend + MaybeSync + 'static,
342 C::Rx: MaybeSend + 'static,
343 {
344 let Self {
345 conduit,
346 mut handshake_result,
347 config,
348 recoverer,
349 } = self;
350 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
351 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
352 let (tx, rx) = conduit.split();
353 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
354 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
355 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
356 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
357 let acceptor: Arc<dyn ConnectionAcceptor> =
358 config.on_connection.unwrap_or_else(|| Arc::new(()));
359 let mut session = Session::pre_handshake(
360 tx,
361 rx,
362 Some(acceptor.clone()),
363 open_rx,
364 close_rx,
365 resume_rx,
366 control_tx.clone(),
367 control_rx,
368 config.keepalive,
369 config.resumable,
370 recoverer,
371 config.recovery_timeout,
372 config.observer.clone(),
373 );
374 let handle = session.establish_from_handshake(handshake_result)?;
375 let resume_key = session.resume_key();
376 let session_handle = SessionHandle {
377 open_tx,
378 close_tx,
379 resume_tx,
380 control_tx,
381 resume_key,
382 };
383 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
385 let pending = super::PendingConnection::with_caller_slot(
386 handle,
387 caller_slot.clone(),
388 config.operation_store,
389 );
390 peer_metadata.push(vox_types::MetadataEntry::str(
391 VOX_SERVICE_METADATA_KEY,
392 Client::SERVICE_NAME,
393 ));
394 let request = super::ConnectionRequest::new(&peer_metadata)?;
395 acceptor
396 .accept(&request, pending)
397 .map_err(SessionError::Rejected)?;
398 let caller =
399 caller_slot.lock().unwrap().take().expect(
400 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
401 );
402 let client = Client::from_vox_session(caller, Some(session_handle));
403 (config.spawn_fn)(Box::pin(async move { session.run().await }));
404 Ok(client)
405 }
406}
407
408pub struct SessionSourceInitiatorBuilder<'a, S> {
409 source: S,
410 mode: TransportMode,
411 config: SessionConfig<'a>,
412}
413
414impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
415 fn new(source: S, mode: TransportMode) -> Self {
416 let config = SessionConfig {
417 resumable: false,
418 ..SessionConfig::default()
419 };
420 Self {
421 source,
422 mode,
423 config,
424 }
425 }
426
427 pub fn parity(mut self, parity: Parity) -> Self {
428 self.config.root_settings.parity = parity;
429 self
430 }
431
432 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
433 self.config.root_settings = settings;
434 self
435 }
436
437 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
438 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
439 self
440 }
441
442 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
443 self.config.root_settings.initial_channel_credit = channel_capacity;
444 self
445 }
446
447 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
449 self.config.observer = Some(Arc::new(observer));
450 self
451 }
452
453 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
455 self.config.observer = Some(observer);
456 self
457 }
458
459 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
460 self.config.metadata = metadata;
461 self
462 }
463
464 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
465 self.config.on_connection = Some(Arc::new(acceptor));
466 self
467 }
468
469 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
470 self.config.keepalive = Some(keepalive);
471 self
472 }
473
474 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
475 self.config.connect_timeout = Some(timeout);
476 self
477 }
478
479 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
480 self.config.recovery_timeout = Some(timeout);
481 self
482 }
483
484 pub fn resumable(mut self) -> Self {
485 self.config.resumable = true;
486 self
487 }
488
489 pub fn non_resumable(mut self) -> Self {
496 self.config.resumable = false;
497 self
498 }
499
500 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
501 self.config.operation_store = Some(operation_store);
502 self
503 }
504
505 #[cfg(not(target_arch = "wasm32"))]
506 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
507 self.config.spawn_fn = Box::new(f);
508 self
509 }
510
511 #[cfg(target_arch = "wasm32")]
512 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
513 self.config.spawn_fn = Box::new(f);
514 self
515 }
516
517 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
518 where
519 S: LinkSource,
520 S::Link: Link + MaybeSend + 'static,
521 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
522 <S::Link as Link>::Rx: MaybeSend + 'static,
523 {
524 let connect_timeout = self.config.connect_timeout;
525 let fut = self.establish_inner::<Client>();
526 match connect_timeout {
527 Some(timeout) => time::timeout(timeout, fut)
528 .await
529 .map_err(|_| SessionError::ConnectTimeout)?,
530 None => fut.await,
531 }
532 }
533
534 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
535 where
536 S: LinkSource,
537 S::Link: Link + MaybeSend + 'static,
538 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
539 <S::Link as Link>::Rx: MaybeSend + 'static,
540 {
541 let Self {
542 mut source,
543 mode,
544 mut config,
545 } = self;
546 inject_service_metadata::<Client>(&mut config.metadata);
547 let _ = mode;
548
549 {
550 {
551 let attachment = source.next_link().await.map_err(SessionError::Io)?;
552 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
553 .await
554 .map_err(session_error_from_transport)?;
555 let handshake_result = handshake_as_initiator(
556 &link.tx,
557 &mut link.rx,
558 config.root_settings.clone(),
559 true,
560 None,
561 metadata_into_owned(config.metadata.clone()),
562 )
563 .await
564 .map_err(session_error_from_handshake)?;
565 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
566 .map_err(SessionError::Protocol)?;
567 let builder = SessionInitiatorBuilder::new(
568 BareConduit::with_message_plan(link, message_plan),
569 handshake_result,
570 );
571 let recoverer = Box::new(BareSourceRecoverer {
572 source,
573 settings: config.root_settings.clone(),
574 connect_timeout: config.connect_timeout,
575 metadata: metadata_into_owned(config.metadata.clone()),
576 });
577 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
578 builder,
579 config,
580 Some(recoverer),
581 )
582 .establish()
583 .await
584 }
585 }
586 }
587}
588
589pub struct SessionTransportInitiatorBuilder<'a, L> {
590 link: L,
591 mode: TransportMode,
592 config: SessionConfig<'a>,
593}
594
595impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
596 fn new(link: L, mode: TransportMode) -> Self {
597 let config = SessionConfig {
598 resumable: false,
599 ..SessionConfig::default()
600 };
601 Self { link, mode, config }
602 }
603
604 pub fn parity(mut self, parity: Parity) -> Self {
605 self.config.root_settings.parity = parity;
606 self
607 }
608
609 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
610 self.config.root_settings = settings;
611 self
612 }
613
614 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
615 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
616 self
617 }
618
619 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
620 self.config.root_settings.initial_channel_credit = channel_capacity;
621 self
622 }
623
624 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
626 self.config.observer = Some(Arc::new(observer));
627 self
628 }
629
630 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
632 self.config.observer = Some(observer);
633 self
634 }
635
636 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
637 self.config.metadata = metadata;
638 self
639 }
640
641 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
642 self.config.on_connection = Some(Arc::new(acceptor));
643 self
644 }
645
646 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
647 self.config.keepalive = Some(keepalive);
648 self
649 }
650
651 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
652 self.config.connect_timeout = Some(timeout);
653 self
654 }
655
656 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
657 self.config.recovery_timeout = Some(timeout);
658 self
659 }
660
661 pub fn resumable(mut self) -> Self {
662 self.config.resumable = true;
663 self
664 }
665
666 pub fn non_resumable(mut self) -> Self {
673 self.config.resumable = false;
674 self
675 }
676
677 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
678 self.config.operation_store = Some(operation_store);
679 self
680 }
681
682 #[cfg(not(target_arch = "wasm32"))]
683 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
684 self.config.spawn_fn = Box::new(f);
685 self
686 }
687
688 #[cfg(target_arch = "wasm32")]
689 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
690 self.config.spawn_fn = Box::new(f);
691 self
692 }
693
694 #[cfg(not(target_arch = "wasm32"))]
695 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
696 where
697 L: Link + Send + 'static,
698 L::Tx: MaybeSend + MaybeSync + 'static,
699 L::Rx: MaybeSend + 'static,
700 {
701 let connect_timeout = self.config.connect_timeout;
702 let fut = self.establish_inner::<Client>();
703 match connect_timeout {
704 Some(timeout) => vox_types::time::tokio::timeout(timeout, fut)
705 .await
706 .map_err(|_| SessionError::ConnectTimeout)?,
707 None => fut.await,
708 }
709 }
710
711 #[cfg(not(target_arch = "wasm32"))]
712 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
713 where
714 L: Link + Send + 'static,
715 L::Tx: MaybeSend + MaybeSync + 'static,
716 L::Rx: MaybeSend + 'static,
717 {
718 let Self {
719 link,
720 mode,
721 mut config,
722 } = self;
723 inject_service_metadata::<Client>(&mut config.metadata);
724 let _ = mode;
725 let link = initiate_transport(link, TransportMode::Bare)
726 .await
727 .map_err(session_error_from_transport)?;
728 Self::finish_with_bare_parts(link, config).await
729 }
730
731 #[cfg(target_arch = "wasm32")]
732 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
733 where
734 L: Link + 'static,
735 L::Tx: MaybeSend + MaybeSync + 'static,
736 L::Rx: MaybeSend + 'static,
737 {
738 let Self {
739 link,
740 mode,
741 mut config,
742 } = self;
743 inject_service_metadata::<Client>(&mut config.metadata);
744 match mode {
745 TransportMode::Bare => {
746 let link = initiate_transport(link, TransportMode::Bare)
747 .await
748 .map_err(session_error_from_transport)?;
749 Self::finish_with_bare_parts(link, config).await
750 }
751 }
752 }
753
754 async fn finish_with_bare_parts<Client: FromVoxSession>(
755 mut link: SplitLink<L::Tx, L::Rx>,
756 config: SessionConfig<'a>,
757 ) -> Result<Client, SessionError>
758 where
759 L: Link + 'static,
760 L::Tx: MaybeSend + MaybeSync + 'static,
761 L::Rx: MaybeSend + 'static,
762 {
763 let handshake_result = handshake_as_initiator(
764 &link.tx,
765 &mut link.rx,
766 config.root_settings.clone(),
767 true,
768 None,
769 metadata_into_owned(config.metadata.clone()),
770 )
771 .await
772 .map_err(session_error_from_handshake)?;
773 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
774 .map_err(SessionError::Protocol)?;
775 let builder = SessionInitiatorBuilder::new(
776 BareConduit::with_message_plan(link, message_plan),
777 handshake_result,
778 );
779 Self::apply_common_parts(builder, config, None)
780 .establish()
781 .await
782 }
783
784 #[allow(clippy::too_many_arguments)]
785 fn apply_common_parts<C>(
786 mut builder: SessionInitiatorBuilder<'a, C>,
787 config: SessionConfig<'a>,
788 recoverer: Option<Box<dyn ConduitRecoverer>>,
789 ) -> SessionInitiatorBuilder<'a, C> {
790 builder.config = config;
791 builder.recoverer = recoverer;
792 builder
793 }
794}
795
796struct BareSourceRecoverer<S> {
797 source: S,
798 settings: ConnectionSettings,
799 connect_timeout: Option<Duration>,
800 metadata: Metadata<'static>,
801}
802
803const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
804const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
805
806impl<S> ConduitRecoverer for BareSourceRecoverer<S>
807where
808 S: LinkSource,
809 S::Link: Link + MaybeSend + 'static,
810 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
811 <S::Link as Link>::Rx: MaybeSend + 'static,
812{
813 fn next_conduit<'a>(
814 &'a mut self,
815 resume_key: Option<&'a SessionResumeKey>,
816 ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
817 Box::pin(async move {
818 let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
819 let mut use_resume_key = resume_key.is_some();
820
821 loop {
822 let selected_resume_key = if use_resume_key { resume_key } else { None };
823
824 let attempt = async {
825 let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
826 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
827 .await
828 .map_err(session_error_from_transport)?;
829 let handshake_result = handshake_as_initiator(
830 &link.tx,
831 &mut link.rx,
832 self.settings.clone(),
833 true,
834 selected_resume_key,
835 metadata_into_owned(self.metadata.clone()),
836 )
837 .await
838 .map_err(session_error_from_handshake)?;
839 let conduit = BareConduit::<MessageFamily, _>::new(link);
840 let (tx, rx) = conduit.split();
841 Ok(super::RecoveredConduit {
842 tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
843 rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
844 handshake: handshake_result,
845 })
846 };
847
848 let result = match self.connect_timeout {
849 Some(timeout) => match time::timeout(timeout, attempt).await {
850 Ok(r) => r,
851 Err(_) => Err(SessionError::ConnectTimeout),
852 },
853 None => attempt.await,
854 };
855
856 match result {
857 Ok(conduit) => return Ok(conduit),
858 Err(e) if !e.is_retryable() => return Err(e),
859 Err(_) => {}
860 }
861
862 if use_resume_key {
863 use_resume_key = false;
866 }
867
868 time::sleep(backoff).await;
869 backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
870 }
871 })
872 }
873}
874
875pub struct SessionAcceptorBuilder<'a, C> {
876 conduit: C,
877 handshake_result: HandshakeResult,
878 config: SessionConfig<'a>,
879}
880
881impl<'a, C> SessionAcceptorBuilder<'a, C> {
882 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
883 let root_settings = handshake_result.our_settings.clone();
884 let mut config = SessionConfig::with_settings(root_settings);
885 config.resumable = false;
887 Self {
888 conduit,
889 handshake_result,
890 config,
891 }
892 }
893
894 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
895 self.config.on_connection = Some(Arc::new(acceptor));
896 self
897 }
898
899 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
900 self.config.keepalive = Some(keepalive);
901 self
902 }
903
904 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
905 self.config.root_settings.initial_channel_credit = channel_capacity;
906 self
907 }
908
909 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
911 self.config.observer = Some(Arc::new(observer));
912 self
913 }
914
915 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
917 self.config.observer = Some(observer);
918 self
919 }
920
921 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
922 self.config.connect_timeout = Some(timeout);
923 self
924 }
925
926 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
927 self.config.recovery_timeout = Some(timeout);
928 self
929 }
930
931 pub fn resumable(mut self) -> Self {
932 self.config.resumable = true;
933 self
934 }
935
936 pub fn non_resumable(mut self) -> Self {
943 self.config.resumable = false;
944 self
945 }
946
947 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
948 self.config.session_registry = Some(session_registry);
949 self
950 }
951
952 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
953 self.config.operation_store = Some(operation_store);
954 self
955 }
956
957 #[cfg(not(target_arch = "wasm32"))]
960 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
961 self.config.spawn_fn = Box::new(f);
962 self
963 }
964
965 #[cfg(target_arch = "wasm32")]
968 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
969 self.config.spawn_fn = Box::new(f);
970 self
971 }
972
973 #[moire::instrument]
974 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
975 where
976 C: Conduit<Msg = MessageFamily> + 'static,
977 C::Tx: MaybeSend + MaybeSync + 'static,
978 C::Rx: MaybeSend + 'static,
979 {
980 let Self {
981 conduit,
982 mut handshake_result,
983 config,
984 } = self;
985 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
986 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
987 let (tx, rx) = conduit.split();
988 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
989 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
990 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
991 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
992 let acceptor: Arc<dyn ConnectionAcceptor> =
993 config.on_connection.unwrap_or_else(|| Arc::new(()));
994 let mut session = Session::pre_handshake(
995 tx,
996 rx,
997 Some(acceptor.clone()),
998 open_rx,
999 close_rx,
1000 resume_rx,
1001 control_tx.clone(),
1002 control_rx,
1003 config.keepalive,
1004 config.resumable,
1005 None,
1006 config.recovery_timeout,
1007 config.observer.clone(),
1008 );
1009 let handle = session.establish_from_handshake(handshake_result)?;
1010 let resume_key = session.resume_key();
1011 let session_handle = SessionHandle {
1012 open_tx,
1013 close_tx,
1014 resume_tx,
1015 control_tx,
1016 resume_key,
1017 };
1018 if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1019 registry.insert(key, session_handle.clone());
1020 session.registered_in_registry = true;
1021 }
1022 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1024 let pending = super::PendingConnection::with_caller_slot(
1025 handle,
1026 caller_slot.clone(),
1027 config.operation_store,
1028 );
1029 peer_metadata.push(vox_types::MetadataEntry::str(
1030 VOX_SERVICE_METADATA_KEY,
1031 Client::SERVICE_NAME,
1032 ));
1033 let request = super::ConnectionRequest::new(&peer_metadata)?;
1034 acceptor
1035 .accept(&request, pending)
1036 .map_err(SessionError::Rejected)?;
1037 let caller =
1038 caller_slot.lock().unwrap().take().expect(
1039 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1040 );
1041 let client = Client::from_vox_session(caller, Some(session_handle));
1042 (config.spawn_fn)(Box::pin(async move { session.run().await }));
1043 Ok(client)
1044 }
1045
1046 #[moire::instrument]
1047 pub async fn establish_or_resume<Client: FromVoxSession>(
1048 self,
1049 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1050 where
1051 C: Conduit<Msg = MessageFamily> + 'static,
1052 C::Tx: MaybeSend + MaybeSync + 'static,
1053 C::Rx: MaybeSend + 'static,
1054 {
1055 if let (Some(registry), Some(resume_key)) = (
1059 &self.config.session_registry,
1060 self.handshake_result.peer_resume_key,
1061 ) && let Some(handle) = registry.get(&resume_key)
1062 {
1063 let (tx, rx) = self.conduit.split();
1064 if let Err(error) = handle
1065 .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1066 .await
1067 {
1068 registry.remove(&resume_key);
1069 return Err(error);
1070 }
1071 return Ok(SessionAcceptOutcome::Resumed);
1072 }
1073
1074 let client = self.establish().await?;
1075 Ok(SessionAcceptOutcome::Established(client))
1076 }
1077}
1078
1079pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1080 link: L,
1081 config: SessionConfig<'a>,
1082}
1083
1084impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1085 fn new(link: L) -> Self {
1086 Self {
1087 link,
1088 config: SessionConfig::with_settings(ConnectionSettings {
1089 parity: Parity::Even,
1090 max_concurrent_requests: 64,
1091 initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
1092 }),
1093 }
1094 }
1095
1096 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1097 self.config.root_settings = settings;
1098 self
1099 }
1100
1101 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1102 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1103 self
1104 }
1105
1106 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
1107 self.config.root_settings.initial_channel_credit = channel_capacity;
1108 self
1109 }
1110
1111 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
1113 self.config.observer = Some(Arc::new(observer));
1114 self
1115 }
1116
1117 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
1119 self.config.observer = Some(observer);
1120 self
1121 }
1122
1123 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1124 self.config.metadata = metadata;
1125 self
1126 }
1127
1128 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1129 self.config.on_connection = Some(Arc::new(acceptor));
1130 self
1131 }
1132
1133 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1134 self.config.keepalive = Some(keepalive);
1135 self
1136 }
1137
1138 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1139 self.config.connect_timeout = Some(timeout);
1140 self
1141 }
1142
1143 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1144 self.config.recovery_timeout = Some(timeout);
1145 self
1146 }
1147
1148 pub fn resumable(mut self) -> Self {
1149 self.config.resumable = true;
1150 self
1151 }
1152
1153 pub fn non_resumable(mut self) -> Self {
1160 self.config.resumable = false;
1161 self
1162 }
1163
1164 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1165 self.config.session_registry = Some(session_registry);
1166 self
1167 }
1168
1169 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1170 self.config.operation_store = Some(operation_store);
1171 self
1172 }
1173
1174 #[cfg(not(target_arch = "wasm32"))]
1175 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1176 self.config.spawn_fn = Box::new(f);
1177 self
1178 }
1179
1180 #[cfg(target_arch = "wasm32")]
1181 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1182 self.config.spawn_fn = Box::new(f);
1183 self
1184 }
1185
1186 #[moire::instrument]
1187 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1188 where
1189 L: Link + MaybeSend + 'static,
1190 L::Tx: MaybeSend + MaybeSync + 'static,
1191 L::Rx: MaybeSend + 'static,
1192 {
1193 let Self { link, mut config } = self;
1194 inject_service_metadata::<Client>(&mut config.metadata);
1195 let (mode, mut link) = accept_transport(link)
1196 .await
1197 .map_err(session_error_from_transport)?;
1198 match mode {
1199 TransportMode::Bare => {
1200 let handshake_result = handshake_as_acceptor(
1201 &link.tx,
1202 &mut link.rx,
1203 config.root_settings.clone(),
1204 true,
1205 config.resumable,
1206 None,
1207 metadata_into_owned(config.metadata.clone()),
1208 )
1209 .await
1210 .map_err(session_error_from_handshake)?;
1211 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1212 .map_err(SessionError::Protocol)?;
1213 let builder = SessionAcceptorBuilder::new(
1214 BareConduit::with_message_plan(link, message_plan),
1215 handshake_result,
1216 );
1217 Self::apply_common_parts(builder, config).establish().await
1218 }
1219 }
1220 }
1221
1222 #[moire::instrument]
1223 pub async fn establish_or_resume<Client: FromVoxSession>(
1224 self,
1225 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1226 where
1227 L: Link + MaybeSend + 'static,
1228 L::Tx: MaybeSend + MaybeSync + 'static,
1229 L::Rx: MaybeSend + 'static,
1230 {
1231 let Self { link, config } = self;
1232 let (mode, mut link) = accept_transport(link)
1233 .await
1234 .map_err(session_error_from_transport)?;
1235 match mode {
1236 TransportMode::Bare => {
1237 let handshake_result = handshake_as_acceptor(
1238 &link.tx,
1239 &mut link.rx,
1240 config.root_settings.clone(),
1241 true,
1242 config.resumable,
1243 None,
1244 metadata_into_owned(config.metadata.clone()),
1245 )
1246 .await
1247 .map_err(session_error_from_handshake)?;
1248 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1249 .map_err(SessionError::Protocol)?;
1250 let builder = SessionAcceptorBuilder::new(
1251 BareConduit::with_message_plan(link, message_plan),
1252 handshake_result,
1253 );
1254 Self::apply_common_parts(builder, config)
1255 .establish_or_resume()
1256 .await
1257 }
1258 }
1259 }
1260
1261 fn apply_common_parts<C>(
1262 mut builder: SessionAcceptorBuilder<'a, C>,
1263 config: SessionConfig<'a>,
1264 ) -> SessionAcceptorBuilder<'a, C> {
1265 builder.config = config;
1266 builder
1267 }
1268}
1269
1270fn validate_negotiated_root_settings(
1271 expected_root_settings: &ConnectionSettings,
1272 handshake_result: &HandshakeResult,
1273) -> Result<(), SessionError> {
1274 if expected_root_settings.initial_channel_credit == 0
1275 || handshake_result.peer_settings.initial_channel_credit == 0
1276 {
1277 return Err(SessionError::Protocol(
1278 "initial_channel_credit must be greater than zero".into(),
1279 ));
1280 }
1281
1282 if handshake_result.our_settings != *expected_root_settings {
1283 return Err(SessionError::Protocol(
1284 "negotiated root settings do not match builder settings".into(),
1285 ));
1286 }
1287 Ok(())
1288}
1289
1290fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1291 match error {
1292 crate::HandshakeError::Io(io) => SessionError::Io(io),
1293 crate::HandshakeError::PeerClosed => {
1294 SessionError::Protocol("peer closed during handshake".into())
1295 }
1296 crate::HandshakeError::NotResumable => SessionError::NotResumable,
1297 other => SessionError::Protocol(other.to_string()),
1298 }
1299}
1300
1301fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1302 match error {
1303 crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1304 crate::TransportPrologueError::LinkDead => {
1305 SessionError::Protocol("link closed during transport prologue".into())
1306 }
1307 crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1308 crate::TransportPrologueError::Rejected(reason) => {
1309 SessionError::Protocol(format!("transport rejected: {reason}"))
1310 }
1311 }
1312}