1use std::{
2 collections::HashMap,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12 Conduit, ConnectionSettings, DEFAULT_INITIAL_CHANNEL_CREDIT, HandshakeResult, Link, MaybeSend,
13 MaybeSync, MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink, VoxObserver,
14 VoxObserverHandle, metadata_into_owned,
15};
16
17use crate::LinkSource;
18use crate::{
19 BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
20 handshake_as_acceptor, handshake_as_initiator, initiate_transport,
21};
22
23use super::{
24 CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
25 SessionHandle, SessionKeepaliveConfig,
26};
27use crate::FromVoxSession;
28
29pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
31
32fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
34 metadata.push(vox_types::MetadataEntry {
35 key: VOX_SERVICE_METADATA_KEY.into(),
36 value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
37 flags: vox_types::MetadataFlags::NONE,
38 });
39}
40
41#[cfg(not(target_arch = "wasm32"))]
44pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
45#[cfg(target_arch = "wasm32")]
46pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
47
48#[cfg(not(target_arch = "wasm32"))]
49type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
50#[cfg(target_arch = "wasm32")]
51type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
52
53#[cfg(not(target_arch = "wasm32"))]
54fn default_spawn_fn() -> SpawnFn {
55 Box::new(|fut| {
56 tokio::spawn(fut);
57 })
58}
59
60#[cfg(target_arch = "wasm32")]
61fn default_spawn_fn() -> SpawnFn {
62 Box::new(|fut| {
63 wasm_bindgen_futures::spawn_local(fut);
64 })
65}
66
67pub fn initiator_conduit<I: IntoConduit>(
70 into_conduit: I,
71 handshake_result: HandshakeResult,
72) -> SessionInitiatorBuilder<'static, I::Conduit> {
73 SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
74}
75
76pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
77where
78 S: LinkSource,
79{
80 SessionSourceInitiatorBuilder::new(source, mode)
81}
82
83pub fn acceptor_conduit<I: IntoConduit>(
84 into_conduit: I,
85 handshake_result: HandshakeResult,
86) -> SessionAcceptorBuilder<'static, I::Conduit> {
87 SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
88}
89
90pub async fn initiator_on_link<L: Link>(
93 link: L,
94 settings: ConnectionSettings,
95) -> Result<
96 SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
97 SessionError,
98>
99where
100 L::Tx: MaybeSend + MaybeSync + 'static,
101 L::Rx: MaybeSend + 'static,
102{
103 let (tx, mut rx) = link.split();
104 let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
105 .await
106 .map_err(session_error_from_handshake)?;
107 let message_plan =
108 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
109 Ok(SessionInitiatorBuilder::new(
110 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
111 handshake_result,
112 ))
113}
114
115pub async fn acceptor_on_link<L: Link>(
118 link: L,
119 settings: ConnectionSettings,
120) -> Result<
121 SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
122 SessionError,
123>
124where
125 L::Tx: MaybeSend + MaybeSync + 'static,
126 L::Rx: MaybeSend + 'static,
127{
128 let (tx, mut rx) = link.split();
129 let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
130 .await
131 .map_err(session_error_from_handshake)?;
132 let message_plan =
133 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
134 Ok(SessionAcceptorBuilder::new(
135 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
136 handshake_result,
137 ))
138}
139
140pub fn initiator_on<L: Link>(
141 link: L,
142 mode: TransportMode,
143) -> SessionTransportInitiatorBuilder<'static, L> {
144 SessionTransportInitiatorBuilder::new(link, mode)
145}
146
147pub fn initiator_transport<L: Link>(
148 link: L,
149 mode: TransportMode,
150) -> SessionTransportInitiatorBuilder<'static, L> {
151 initiator_on(link, mode)
152}
153
154pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
155 SessionTransportAcceptorBuilder::new(link)
156}
157
158pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
159 acceptor_on(link)
160}
161
162#[derive(Clone, Default)]
163pub struct SessionRegistry {
164 inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
165}
166
167impl SessionRegistry {
168 fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
169 self.inner
170 .lock()
171 .expect("session registry poisoned")
172 .get(key)
173 .cloned()
174 }
175
176 fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
177 self.inner
178 .lock()
179 .expect("session registry poisoned")
180 .insert(key, handle);
181 }
182
183 fn remove(&self, key: &SessionResumeKey) {
184 self.inner
185 .lock()
186 .expect("session registry poisoned")
187 .remove(key);
188 }
189}
190
191pub enum SessionAcceptOutcome<Client> {
192 Established(Client),
193 Resumed,
194}
195
196pub struct SessionConfig<'a> {
198 pub root_settings: ConnectionSettings,
199 pub metadata: Metadata<'a>,
200 pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
201 pub keepalive: Option<SessionKeepaliveConfig>,
202 pub resumable: bool,
203 pub session_registry: Option<SessionRegistry>,
204 pub operation_store: Option<Arc<dyn OperationStore>>,
205 pub spawn_fn: SpawnFn,
206 pub connect_timeout: Option<std::time::Duration>,
207 pub recovery_timeout: Option<std::time::Duration>,
208 pub observer: Option<VoxObserverHandle>,
209}
210
211impl SessionConfig<'_> {
212 fn with_settings(root_settings: ConnectionSettings) -> Self {
213 Self {
214 root_settings,
215 metadata: vec![],
216 on_connection: None,
217 keepalive: None,
218 resumable: true,
219 session_registry: None,
220 operation_store: None,
221 spawn_fn: default_spawn_fn(),
222 connect_timeout: None,
223 recovery_timeout: None,
224 observer: None,
225 }
226 }
227}
228
229impl Default for SessionConfig<'_> {
230 fn default() -> Self {
231 Self::with_settings(ConnectionSettings {
232 parity: Parity::Odd,
233 max_concurrent_requests: 64,
234 initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
235 })
236 }
237}
238
239pub struct SessionInitiatorBuilder<'a, C> {
240 conduit: C,
241 handshake_result: HandshakeResult,
242 config: SessionConfig<'a>,
243 recoverer: Option<Box<dyn ConduitRecoverer>>,
244}
245
246impl<'a, C> SessionInitiatorBuilder<'a, C> {
247 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
248 let root_settings = handshake_result.our_settings.clone();
249 let mut config = SessionConfig::with_settings(root_settings);
250 config.resumable = false;
252 Self {
253 conduit,
254 handshake_result,
255 config,
256 recoverer: None,
257 }
258 }
259
260 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
261 self.config.on_connection = Some(Arc::new(acceptor));
262 self
263 }
264
265 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
266 self.config.keepalive = Some(keepalive);
267 self
268 }
269
270 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
271 self.config.root_settings.initial_channel_credit = channel_capacity;
272 self
273 }
274
275 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
277 self.config.observer = Some(Arc::new(observer));
278 self
279 }
280
281 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
283 self.config.observer = Some(observer);
284 self
285 }
286
287 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
288 self.config.connect_timeout = Some(timeout);
289 self
290 }
291
292 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
293 self.config.recovery_timeout = Some(timeout);
294 self
295 }
296
297 pub fn resumable(mut self) -> Self {
298 self.config.resumable = true;
299 self
300 }
301
302 pub fn non_resumable(mut self) -> Self {
309 self.config.resumable = false;
310 self
311 }
312
313 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
314 self.config.operation_store = Some(operation_store);
315 self
316 }
317
318 #[cfg(not(target_arch = "wasm32"))]
321 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
322 self.config.spawn_fn = Box::new(f);
323 self
324 }
325
326 #[cfg(target_arch = "wasm32")]
329 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
330 self.config.spawn_fn = Box::new(f);
331 self
332 }
333
334 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
339 where
340 C: Conduit<Msg = MessageFamily> + 'static,
341 C::Tx: MaybeSend + MaybeSync + 'static,
342 C::Rx: MaybeSend + 'static,
343 {
344 let Self {
345 conduit,
346 mut handshake_result,
347 config,
348 recoverer,
349 } = self;
350 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
351 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
352 let (tx, rx) = conduit.split();
353 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
354 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
355 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
356 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
357 let acceptor: Arc<dyn ConnectionAcceptor> =
358 config.on_connection.unwrap_or_else(|| Arc::new(()));
359 let mut session = Session::pre_handshake(
360 tx,
361 rx,
362 Some(acceptor.clone()),
363 open_rx,
364 close_rx,
365 resume_rx,
366 control_tx.clone(),
367 control_rx,
368 config.keepalive,
369 config.resumable,
370 recoverer,
371 config.recovery_timeout,
372 config.observer.clone(),
373 );
374 let handle = session.establish_from_handshake(handshake_result)?;
375 let resume_key = session.resume_key();
376 let session_handle = SessionHandle {
377 open_tx,
378 close_tx,
379 resume_tx,
380 control_tx,
381 resume_key,
382 };
383 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
385 let pending = super::PendingConnection::with_caller_slot(
386 handle,
387 caller_slot.clone(),
388 config.operation_store,
389 );
390 peer_metadata.push(vox_types::MetadataEntry::str(
391 VOX_SERVICE_METADATA_KEY,
392 Client::SERVICE_NAME,
393 ));
394 let request = super::ConnectionRequest::new(&peer_metadata)?;
395 tracing::debug!(
396 service = Client::SERVICE_NAME,
397 "vox root connection routing to acceptor"
398 );
399 match acceptor.accept(&request, pending) {
400 Ok(()) => tracing::debug!(
401 service = Client::SERVICE_NAME,
402 "vox root connection accepted"
403 ),
404 Err(metadata) => {
405 tracing::debug!(
406 service = Client::SERVICE_NAME,
407 metadata_len = metadata.len(),
408 "vox root connection rejected"
409 );
410 return Err(SessionError::Rejected(metadata));
411 }
412 }
413 let caller =
414 caller_slot.lock().unwrap().take().expect(
415 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
416 );
417 let client = Client::from_vox_session(caller, Some(session_handle));
418 (config.spawn_fn)(Box::pin(async move { session.run().await }));
419 Ok(client)
420 }
421}
422
423pub struct SessionSourceInitiatorBuilder<'a, S> {
424 source: S,
425 mode: TransportMode,
426 config: SessionConfig<'a>,
427}
428
429impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
430 fn new(source: S, mode: TransportMode) -> Self {
431 let config = SessionConfig {
432 resumable: false,
433 ..SessionConfig::default()
434 };
435 Self {
436 source,
437 mode,
438 config,
439 }
440 }
441
442 pub fn parity(mut self, parity: Parity) -> Self {
443 self.config.root_settings.parity = parity;
444 self
445 }
446
447 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
448 self.config.root_settings = settings;
449 self
450 }
451
452 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
453 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
454 self
455 }
456
457 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
458 self.config.root_settings.initial_channel_credit = channel_capacity;
459 self
460 }
461
462 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
464 self.config.observer = Some(Arc::new(observer));
465 self
466 }
467
468 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
470 self.config.observer = Some(observer);
471 self
472 }
473
474 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
475 self.config.metadata = metadata;
476 self
477 }
478
479 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
480 self.config.on_connection = Some(Arc::new(acceptor));
481 self
482 }
483
484 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
485 self.config.keepalive = Some(keepalive);
486 self
487 }
488
489 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
490 self.config.connect_timeout = Some(timeout);
491 self
492 }
493
494 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
495 self.config.recovery_timeout = Some(timeout);
496 self
497 }
498
499 pub fn resumable(mut self) -> Self {
500 self.config.resumable = true;
501 self
502 }
503
504 pub fn non_resumable(mut self) -> Self {
511 self.config.resumable = false;
512 self
513 }
514
515 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
516 self.config.operation_store = Some(operation_store);
517 self
518 }
519
520 #[cfg(not(target_arch = "wasm32"))]
521 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
522 self.config.spawn_fn = Box::new(f);
523 self
524 }
525
526 #[cfg(target_arch = "wasm32")]
527 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
528 self.config.spawn_fn = Box::new(f);
529 self
530 }
531
532 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
533 where
534 S: LinkSource,
535 S::Link: Link + MaybeSend + 'static,
536 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
537 <S::Link as Link>::Rx: MaybeSend + 'static,
538 {
539 let connect_timeout = self.config.connect_timeout;
540 let fut = self.establish_inner::<Client>();
541 match connect_timeout {
542 Some(timeout) => time::timeout(timeout, fut)
543 .await
544 .map_err(|_| SessionError::ConnectTimeout)?,
545 None => fut.await,
546 }
547 }
548
549 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
550 where
551 S: LinkSource,
552 S::Link: Link + MaybeSend + 'static,
553 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
554 <S::Link as Link>::Rx: MaybeSend + 'static,
555 {
556 let Self {
557 mut source,
558 mode,
559 mut config,
560 } = self;
561 inject_service_metadata::<Client>(&mut config.metadata);
562 let _ = mode;
563
564 {
565 {
566 let attachment = source.next_link().await.map_err(SessionError::Io)?;
567 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
568 .await
569 .map_err(session_error_from_transport)?;
570 let handshake_result = handshake_as_initiator(
571 &link.tx,
572 &mut link.rx,
573 config.root_settings.clone(),
574 true,
575 None,
576 metadata_into_owned(config.metadata.clone()),
577 )
578 .await
579 .map_err(session_error_from_handshake)?;
580 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
581 .map_err(SessionError::Protocol)?;
582 let builder = SessionInitiatorBuilder::new(
583 BareConduit::with_message_plan(link, message_plan),
584 handshake_result,
585 );
586 let recoverer = Box::new(BareSourceRecoverer {
587 source,
588 settings: config.root_settings.clone(),
589 connect_timeout: config.connect_timeout,
590 metadata: metadata_into_owned(config.metadata.clone()),
591 });
592 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
593 builder,
594 config,
595 Some(recoverer),
596 )
597 .establish()
598 .await
599 }
600 }
601 }
602}
603
604pub struct SessionTransportInitiatorBuilder<'a, L> {
605 link: L,
606 mode: TransportMode,
607 config: SessionConfig<'a>,
608}
609
610impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
611 fn new(link: L, mode: TransportMode) -> Self {
612 let config = SessionConfig {
613 resumable: false,
614 ..SessionConfig::default()
615 };
616 Self { link, mode, config }
617 }
618
619 pub fn parity(mut self, parity: Parity) -> Self {
620 self.config.root_settings.parity = parity;
621 self
622 }
623
624 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
625 self.config.root_settings = settings;
626 self
627 }
628
629 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
630 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
631 self
632 }
633
634 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
635 self.config.root_settings.initial_channel_credit = channel_capacity;
636 self
637 }
638
639 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
641 self.config.observer = Some(Arc::new(observer));
642 self
643 }
644
645 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
647 self.config.observer = Some(observer);
648 self
649 }
650
651 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
652 self.config.metadata = metadata;
653 self
654 }
655
656 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
657 self.config.on_connection = Some(Arc::new(acceptor));
658 self
659 }
660
661 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
662 self.config.keepalive = Some(keepalive);
663 self
664 }
665
666 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
667 self.config.connect_timeout = Some(timeout);
668 self
669 }
670
671 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
672 self.config.recovery_timeout = Some(timeout);
673 self
674 }
675
676 pub fn resumable(mut self) -> Self {
677 self.config.resumable = true;
678 self
679 }
680
681 pub fn non_resumable(mut self) -> Self {
688 self.config.resumable = false;
689 self
690 }
691
692 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
693 self.config.operation_store = Some(operation_store);
694 self
695 }
696
697 #[cfg(not(target_arch = "wasm32"))]
698 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
699 self.config.spawn_fn = Box::new(f);
700 self
701 }
702
703 #[cfg(target_arch = "wasm32")]
704 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
705 self.config.spawn_fn = Box::new(f);
706 self
707 }
708
709 #[cfg(not(target_arch = "wasm32"))]
710 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
711 where
712 L: Link + Send + 'static,
713 L::Tx: MaybeSend + MaybeSync + 'static,
714 L::Rx: MaybeSend + 'static,
715 {
716 let connect_timeout = self.config.connect_timeout;
717 let fut = self.establish_inner::<Client>();
718 match connect_timeout {
719 Some(timeout) => vox_types::time::tokio::timeout(timeout, fut)
720 .await
721 .map_err(|_| SessionError::ConnectTimeout)?,
722 None => fut.await,
723 }
724 }
725
726 #[cfg(not(target_arch = "wasm32"))]
727 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
728 where
729 L: Link + Send + 'static,
730 L::Tx: MaybeSend + MaybeSync + 'static,
731 L::Rx: MaybeSend + 'static,
732 {
733 let Self {
734 link,
735 mode,
736 mut config,
737 } = self;
738 inject_service_metadata::<Client>(&mut config.metadata);
739 let _ = mode;
740 let link = initiate_transport(link, TransportMode::Bare)
741 .await
742 .map_err(session_error_from_transport)?;
743 Self::finish_with_bare_parts(link, config).await
744 }
745
746 #[cfg(target_arch = "wasm32")]
747 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
748 where
749 L: Link + 'static,
750 L::Tx: MaybeSend + MaybeSync + 'static,
751 L::Rx: MaybeSend + 'static,
752 {
753 let Self {
754 link,
755 mode,
756 mut config,
757 } = self;
758 inject_service_metadata::<Client>(&mut config.metadata);
759 match mode {
760 TransportMode::Bare => {
761 let link = initiate_transport(link, TransportMode::Bare)
762 .await
763 .map_err(session_error_from_transport)?;
764 Self::finish_with_bare_parts(link, config).await
765 }
766 }
767 }
768
769 async fn finish_with_bare_parts<Client: FromVoxSession>(
770 mut link: SplitLink<L::Tx, L::Rx>,
771 config: SessionConfig<'a>,
772 ) -> Result<Client, SessionError>
773 where
774 L: Link + 'static,
775 L::Tx: MaybeSend + MaybeSync + 'static,
776 L::Rx: MaybeSend + 'static,
777 {
778 let handshake_result = handshake_as_initiator(
779 &link.tx,
780 &mut link.rx,
781 config.root_settings.clone(),
782 true,
783 None,
784 metadata_into_owned(config.metadata.clone()),
785 )
786 .await
787 .map_err(session_error_from_handshake)?;
788 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
789 .map_err(SessionError::Protocol)?;
790 let builder = SessionInitiatorBuilder::new(
791 BareConduit::with_message_plan(link, message_plan),
792 handshake_result,
793 );
794 Self::apply_common_parts(builder, config, None)
795 .establish()
796 .await
797 }
798
799 #[allow(clippy::too_many_arguments)]
800 fn apply_common_parts<C>(
801 mut builder: SessionInitiatorBuilder<'a, C>,
802 config: SessionConfig<'a>,
803 recoverer: Option<Box<dyn ConduitRecoverer>>,
804 ) -> SessionInitiatorBuilder<'a, C> {
805 builder.config = config;
806 builder.recoverer = recoverer;
807 builder
808 }
809}
810
811struct BareSourceRecoverer<S> {
812 source: S,
813 settings: ConnectionSettings,
814 connect_timeout: Option<Duration>,
815 metadata: Metadata<'static>,
816}
817
818const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
819const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
820
821impl<S> ConduitRecoverer for BareSourceRecoverer<S>
822where
823 S: LinkSource,
824 S::Link: Link + MaybeSend + 'static,
825 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
826 <S::Link as Link>::Rx: MaybeSend + 'static,
827{
828 fn next_conduit<'a>(
829 &'a mut self,
830 resume_key: Option<&'a SessionResumeKey>,
831 ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
832 Box::pin(async move {
833 let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
834 let mut use_resume_key = resume_key.is_some();
835
836 loop {
837 let selected_resume_key = if use_resume_key { resume_key } else { None };
838
839 let attempt = async {
840 let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
841 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
842 .await
843 .map_err(session_error_from_transport)?;
844 let handshake_result = handshake_as_initiator(
845 &link.tx,
846 &mut link.rx,
847 self.settings.clone(),
848 true,
849 selected_resume_key,
850 metadata_into_owned(self.metadata.clone()),
851 )
852 .await
853 .map_err(session_error_from_handshake)?;
854 let conduit = BareConduit::<MessageFamily, _>::new(link);
855 let (tx, rx) = conduit.split();
856 Ok(super::RecoveredConduit {
857 tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
858 rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
859 handshake: handshake_result,
860 })
861 };
862
863 let result = match self.connect_timeout {
864 Some(timeout) => match time::timeout(timeout, attempt).await {
865 Ok(r) => r,
866 Err(_) => Err(SessionError::ConnectTimeout),
867 },
868 None => attempt.await,
869 };
870
871 match result {
872 Ok(conduit) => return Ok(conduit),
873 Err(e) if !e.is_retryable() => return Err(e),
874 Err(_) => {}
875 }
876
877 if use_resume_key {
878 use_resume_key = false;
881 }
882
883 time::sleep(backoff).await;
884 backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
885 }
886 })
887 }
888}
889
890pub struct SessionAcceptorBuilder<'a, C> {
891 conduit: C,
892 handshake_result: HandshakeResult,
893 config: SessionConfig<'a>,
894}
895
896impl<'a, C> SessionAcceptorBuilder<'a, C> {
897 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
898 let root_settings = handshake_result.our_settings.clone();
899 let mut config = SessionConfig::with_settings(root_settings);
900 config.resumable = false;
902 Self {
903 conduit,
904 handshake_result,
905 config,
906 }
907 }
908
909 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
910 self.config.on_connection = Some(Arc::new(acceptor));
911 self
912 }
913
914 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
915 self.config.keepalive = Some(keepalive);
916 self
917 }
918
919 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
920 self.config.root_settings.initial_channel_credit = channel_capacity;
921 self
922 }
923
924 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
926 self.config.observer = Some(Arc::new(observer));
927 self
928 }
929
930 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
932 self.config.observer = Some(observer);
933 self
934 }
935
936 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
937 self.config.connect_timeout = Some(timeout);
938 self
939 }
940
941 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
942 self.config.recovery_timeout = Some(timeout);
943 self
944 }
945
946 pub fn resumable(mut self) -> Self {
947 self.config.resumable = true;
948 self
949 }
950
951 pub fn non_resumable(mut self) -> Self {
958 self.config.resumable = false;
959 self
960 }
961
962 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
963 self.config.session_registry = Some(session_registry);
964 self
965 }
966
967 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
968 self.config.operation_store = Some(operation_store);
969 self
970 }
971
972 #[cfg(not(target_arch = "wasm32"))]
975 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
976 self.config.spawn_fn = Box::new(f);
977 self
978 }
979
980 #[cfg(target_arch = "wasm32")]
983 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
984 self.config.spawn_fn = Box::new(f);
985 self
986 }
987
988 #[moire::instrument]
989 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
990 where
991 C: Conduit<Msg = MessageFamily> + 'static,
992 C::Tx: MaybeSend + MaybeSync + 'static,
993 C::Rx: MaybeSend + 'static,
994 {
995 let Self {
996 conduit,
997 mut handshake_result,
998 config,
999 } = self;
1000 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
1001 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
1002 let (tx, rx) = conduit.split();
1003 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
1004 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
1005 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
1006 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
1007 let acceptor: Arc<dyn ConnectionAcceptor> =
1008 config.on_connection.unwrap_or_else(|| Arc::new(()));
1009 let mut session = Session::pre_handshake(
1010 tx,
1011 rx,
1012 Some(acceptor.clone()),
1013 open_rx,
1014 close_rx,
1015 resume_rx,
1016 control_tx.clone(),
1017 control_rx,
1018 config.keepalive,
1019 config.resumable,
1020 None,
1021 config.recovery_timeout,
1022 config.observer.clone(),
1023 );
1024 let handle = session.establish_from_handshake(handshake_result)?;
1025 let resume_key = session.resume_key();
1026 let session_handle = SessionHandle {
1027 open_tx,
1028 close_tx,
1029 resume_tx,
1030 control_tx,
1031 resume_key,
1032 };
1033 if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1034 registry.insert(key, session_handle.clone());
1035 session.registered_in_registry = true;
1036 }
1037 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1039 let pending = super::PendingConnection::with_caller_slot(
1040 handle,
1041 caller_slot.clone(),
1042 config.operation_store,
1043 );
1044 peer_metadata.push(vox_types::MetadataEntry::str(
1045 VOX_SERVICE_METADATA_KEY,
1046 Client::SERVICE_NAME,
1047 ));
1048 let request = super::ConnectionRequest::new(&peer_metadata)?;
1049 tracing::debug!(
1050 service = Client::SERVICE_NAME,
1051 "vox root connection routing to acceptor"
1052 );
1053 match acceptor.accept(&request, pending) {
1054 Ok(()) => tracing::debug!(
1055 service = Client::SERVICE_NAME,
1056 "vox root connection accepted"
1057 ),
1058 Err(metadata) => {
1059 tracing::debug!(
1060 service = Client::SERVICE_NAME,
1061 metadata_len = metadata.len(),
1062 "vox root connection rejected"
1063 );
1064 return Err(SessionError::Rejected(metadata));
1065 }
1066 }
1067 let caller =
1068 caller_slot.lock().unwrap().take().expect(
1069 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1070 );
1071 let client = Client::from_vox_session(caller, Some(session_handle));
1072 (config.spawn_fn)(Box::pin(async move { session.run().await }));
1073 Ok(client)
1074 }
1075
1076 #[moire::instrument]
1077 pub async fn establish_or_resume<Client: FromVoxSession>(
1078 self,
1079 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1080 where
1081 C: Conduit<Msg = MessageFamily> + 'static,
1082 C::Tx: MaybeSend + MaybeSync + 'static,
1083 C::Rx: MaybeSend + 'static,
1084 {
1085 if let (Some(registry), Some(resume_key)) = (
1089 &self.config.session_registry,
1090 self.handshake_result.peer_resume_key,
1091 ) && let Some(handle) = registry.get(&resume_key)
1092 {
1093 let (tx, rx) = self.conduit.split();
1094 if let Err(error) = handle
1095 .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1096 .await
1097 {
1098 registry.remove(&resume_key);
1099 return Err(error);
1100 }
1101 return Ok(SessionAcceptOutcome::Resumed);
1102 }
1103
1104 let client = self.establish().await?;
1105 Ok(SessionAcceptOutcome::Established(client))
1106 }
1107}
1108
1109pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1110 link: L,
1111 config: SessionConfig<'a>,
1112}
1113
1114impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1115 fn new(link: L) -> Self {
1116 Self {
1117 link,
1118 config: SessionConfig::with_settings(ConnectionSettings {
1119 parity: Parity::Even,
1120 max_concurrent_requests: 64,
1121 initial_channel_credit: DEFAULT_INITIAL_CHANNEL_CREDIT,
1122 }),
1123 }
1124 }
1125
1126 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1127 self.config.root_settings = settings;
1128 self
1129 }
1130
1131 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1132 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1133 self
1134 }
1135
1136 pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
1137 self.config.root_settings.initial_channel_credit = channel_capacity;
1138 self
1139 }
1140
1141 pub fn observer(mut self, observer: impl VoxObserver) -> Self {
1143 self.config.observer = Some(Arc::new(observer));
1144 self
1145 }
1146
1147 pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
1149 self.config.observer = Some(observer);
1150 self
1151 }
1152
1153 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1154 self.config.metadata = metadata;
1155 self
1156 }
1157
1158 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1159 self.config.on_connection = Some(Arc::new(acceptor));
1160 self
1161 }
1162
1163 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1164 self.config.keepalive = Some(keepalive);
1165 self
1166 }
1167
1168 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1169 self.config.connect_timeout = Some(timeout);
1170 self
1171 }
1172
1173 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1174 self.config.recovery_timeout = Some(timeout);
1175 self
1176 }
1177
1178 pub fn resumable(mut self) -> Self {
1179 self.config.resumable = true;
1180 self
1181 }
1182
1183 pub fn non_resumable(mut self) -> Self {
1190 self.config.resumable = false;
1191 self
1192 }
1193
1194 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1195 self.config.session_registry = Some(session_registry);
1196 self
1197 }
1198
1199 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1200 self.config.operation_store = Some(operation_store);
1201 self
1202 }
1203
1204 #[cfg(not(target_arch = "wasm32"))]
1205 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1206 self.config.spawn_fn = Box::new(f);
1207 self
1208 }
1209
1210 #[cfg(target_arch = "wasm32")]
1211 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1212 self.config.spawn_fn = Box::new(f);
1213 self
1214 }
1215
1216 #[moire::instrument]
1217 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1218 where
1219 L: Link + MaybeSend + 'static,
1220 L::Tx: MaybeSend + MaybeSync + 'static,
1221 L::Rx: MaybeSend + 'static,
1222 {
1223 let Self { link, mut config } = self;
1224 inject_service_metadata::<Client>(&mut config.metadata);
1225 let (mode, mut link) = accept_transport(link)
1226 .await
1227 .map_err(session_error_from_transport)?;
1228 match mode {
1229 TransportMode::Bare => {
1230 let handshake_result = handshake_as_acceptor(
1231 &link.tx,
1232 &mut link.rx,
1233 config.root_settings.clone(),
1234 true,
1235 config.resumable,
1236 None,
1237 metadata_into_owned(config.metadata.clone()),
1238 )
1239 .await
1240 .map_err(session_error_from_handshake)?;
1241 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1242 .map_err(SessionError::Protocol)?;
1243 let builder = SessionAcceptorBuilder::new(
1244 BareConduit::with_message_plan(link, message_plan),
1245 handshake_result,
1246 );
1247 Self::apply_common_parts(builder, config).establish().await
1248 }
1249 }
1250 }
1251
1252 #[moire::instrument]
1253 pub async fn establish_or_resume<Client: FromVoxSession>(
1254 self,
1255 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1256 where
1257 L: Link + MaybeSend + 'static,
1258 L::Tx: MaybeSend + MaybeSync + 'static,
1259 L::Rx: MaybeSend + 'static,
1260 {
1261 let Self { link, config } = self;
1262 let (mode, mut link) = accept_transport(link)
1263 .await
1264 .map_err(session_error_from_transport)?;
1265 match mode {
1266 TransportMode::Bare => {
1267 let handshake_result = handshake_as_acceptor(
1268 &link.tx,
1269 &mut link.rx,
1270 config.root_settings.clone(),
1271 true,
1272 config.resumable,
1273 None,
1274 metadata_into_owned(config.metadata.clone()),
1275 )
1276 .await
1277 .map_err(session_error_from_handshake)?;
1278 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1279 .map_err(SessionError::Protocol)?;
1280 let builder = SessionAcceptorBuilder::new(
1281 BareConduit::with_message_plan(link, message_plan),
1282 handshake_result,
1283 );
1284 Self::apply_common_parts(builder, config)
1285 .establish_or_resume()
1286 .await
1287 }
1288 }
1289 }
1290
1291 fn apply_common_parts<C>(
1292 mut builder: SessionAcceptorBuilder<'a, C>,
1293 config: SessionConfig<'a>,
1294 ) -> SessionAcceptorBuilder<'a, C> {
1295 builder.config = config;
1296 builder
1297 }
1298}
1299
1300fn validate_negotiated_root_settings(
1301 expected_root_settings: &ConnectionSettings,
1302 handshake_result: &HandshakeResult,
1303) -> Result<(), SessionError> {
1304 if expected_root_settings.initial_channel_credit == 0
1305 || handshake_result.peer_settings.initial_channel_credit == 0
1306 {
1307 return Err(SessionError::Protocol(
1308 "initial_channel_credit must be greater than zero".into(),
1309 ));
1310 }
1311
1312 if handshake_result.our_settings != *expected_root_settings {
1313 return Err(SessionError::Protocol(
1314 "negotiated root settings do not match builder settings".into(),
1315 ));
1316 }
1317 Ok(())
1318}
1319
1320fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1321 match error {
1322 crate::HandshakeError::Io(io) => SessionError::Io(io),
1323 crate::HandshakeError::PeerClosed => {
1324 SessionError::Protocol("peer closed during handshake".into())
1325 }
1326 crate::HandshakeError::NotResumable => SessionError::NotResumable,
1327 other => SessionError::Protocol(other.to_string()),
1328 }
1329}
1330
1331fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1332 match error {
1333 crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1334 crate::TransportPrologueError::LinkDead => {
1335 SessionError::Protocol("link closed during transport prologue".into())
1336 }
1337 crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1338 crate::TransportPrologueError::Rejected(reason) => {
1339 SessionError::Protocol(format!("transport rejected: {reason}"))
1340 }
1341 }
1342}