1use std::{
2 collections::HashMap,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use moire::sync::mpsc;
10use moire::time;
11use vox_types::{
12 Conduit, ConnectionSettings, HandshakeResult, Link, MaybeSend, MaybeSync, MessageFamily,
13 Metadata, Parity, SessionResumeKey, SplitLink, metadata_into_owned,
14};
15
16use crate::{Attachment, LinkSource, StableConduit};
17use crate::{
18 BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
19 handshake_as_acceptor, handshake_as_initiator, initiate_transport,
20};
21
22use super::{
23 CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
24 SessionHandle, SessionKeepaliveConfig,
25};
26use crate::FromVoxSession;
27
28pub const VOX_SERVICE_METADATA_KEY: &str = "vox-service";
30
31fn inject_service_metadata<Client: FromVoxSession>(metadata: &mut Metadata<'_>) {
33 metadata.push(vox_types::MetadataEntry {
34 key: VOX_SERVICE_METADATA_KEY.into(),
35 value: vox_types::MetadataValue::String(Client::SERVICE_NAME.into()),
36 flags: vox_types::MetadataFlags::NONE,
37 });
38}
39
40#[cfg(not(target_arch = "wasm32"))]
43pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
44#[cfg(target_arch = "wasm32")]
45pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
46
47#[cfg(not(target_arch = "wasm32"))]
48type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
49#[cfg(target_arch = "wasm32")]
50type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
51
52#[cfg(not(target_arch = "wasm32"))]
53fn default_spawn_fn() -> SpawnFn {
54 Box::new(|fut| {
55 tokio::spawn(fut);
56 })
57}
58
59#[cfg(target_arch = "wasm32")]
60fn default_spawn_fn() -> SpawnFn {
61 Box::new(|fut| {
62 wasm_bindgen_futures::spawn_local(fut);
63 })
64}
65
66pub fn initiator_conduit<I: IntoConduit>(
69 into_conduit: I,
70 handshake_result: HandshakeResult,
71) -> SessionInitiatorBuilder<'static, I::Conduit> {
72 SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
73}
74
75pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
76where
77 S: LinkSource,
78{
79 SessionSourceInitiatorBuilder::new(source, mode)
80}
81
82pub fn acceptor_conduit<I: IntoConduit>(
83 into_conduit: I,
84 handshake_result: HandshakeResult,
85) -> SessionAcceptorBuilder<'static, I::Conduit> {
86 SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
87}
88
89pub async fn initiator_on_link<L: Link>(
92 link: L,
93 settings: ConnectionSettings,
94) -> Result<
95 SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
96 SessionError,
97>
98where
99 L::Tx: MaybeSend + MaybeSync + 'static,
100 L::Rx: MaybeSend + 'static,
101{
102 let (tx, mut rx) = link.split();
103 let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None, vec![])
104 .await
105 .map_err(session_error_from_handshake)?;
106 let message_plan =
107 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
108 Ok(SessionInitiatorBuilder::new(
109 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
110 handshake_result,
111 ))
112}
113
114pub async fn acceptor_on_link<L: Link>(
117 link: L,
118 settings: ConnectionSettings,
119) -> Result<
120 SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
121 SessionError,
122>
123where
124 L::Tx: MaybeSend + MaybeSync + 'static,
125 L::Rx: MaybeSend + 'static,
126{
127 let (tx, mut rx) = link.split();
128 let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None, vec![])
129 .await
130 .map_err(session_error_from_handshake)?;
131 let message_plan =
132 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
133 Ok(SessionAcceptorBuilder::new(
134 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
135 handshake_result,
136 ))
137}
138
139pub fn initiator_on<L: Link>(
140 link: L,
141 mode: TransportMode,
142) -> SessionTransportInitiatorBuilder<'static, L> {
143 SessionTransportInitiatorBuilder::new(link, mode)
144}
145
146pub fn initiator_transport<L: Link>(
147 link: L,
148 mode: TransportMode,
149) -> SessionTransportInitiatorBuilder<'static, L> {
150 initiator_on(link, mode)
151}
152
153pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
154 SessionTransportAcceptorBuilder::new(link)
155}
156
157pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
158 acceptor_on(link)
159}
160
161#[derive(Clone, Default)]
162pub struct SessionRegistry {
163 inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
164}
165
166impl SessionRegistry {
167 fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
168 self.inner
169 .lock()
170 .expect("session registry poisoned")
171 .get(key)
172 .cloned()
173 }
174
175 fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
176 self.inner
177 .lock()
178 .expect("session registry poisoned")
179 .insert(key, handle);
180 }
181
182 fn remove(&self, key: &SessionResumeKey) {
183 self.inner
184 .lock()
185 .expect("session registry poisoned")
186 .remove(key);
187 }
188}
189
190pub enum SessionAcceptOutcome<Client> {
191 Established(Client),
192 Resumed,
193}
194
195pub struct SessionConfig<'a> {
197 pub root_settings: ConnectionSettings,
198 pub metadata: Metadata<'a>,
199 pub on_connection: Option<Arc<dyn ConnectionAcceptor>>,
200 pub keepalive: Option<SessionKeepaliveConfig>,
201 pub resumable: bool,
202 pub session_registry: Option<SessionRegistry>,
203 pub operation_store: Option<Arc<dyn OperationStore>>,
204 pub spawn_fn: SpawnFn,
205 pub connect_timeout: Option<std::time::Duration>,
206 pub recovery_timeout: Option<std::time::Duration>,
207}
208
209impl SessionConfig<'_> {
210 fn with_settings(root_settings: ConnectionSettings) -> Self {
211 Self {
212 root_settings,
213 metadata: vec![],
214 on_connection: None,
215 keepalive: None,
216 resumable: true,
217 session_registry: None,
218 operation_store: None,
219 spawn_fn: default_spawn_fn(),
220 connect_timeout: None,
221 recovery_timeout: None,
222 }
223 }
224}
225
226impl Default for SessionConfig<'_> {
227 fn default() -> Self {
228 Self::with_settings(ConnectionSettings {
229 parity: Parity::Odd,
230 max_concurrent_requests: 64,
231 })
232 }
233}
234
235pub struct SessionInitiatorBuilder<'a, C> {
236 conduit: C,
237 handshake_result: HandshakeResult,
238 config: SessionConfig<'a>,
239 recoverer: Option<Box<dyn ConduitRecoverer>>,
240}
241
242impl<'a, C> SessionInitiatorBuilder<'a, C> {
243 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
244 let root_settings = handshake_result.our_settings.clone();
245 let mut config = SessionConfig::with_settings(root_settings);
246 config.resumable = false;
248 Self {
249 conduit,
250 handshake_result,
251 config,
252 recoverer: None,
253 }
254 }
255
256 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
257 self.config.on_connection = Some(Arc::new(acceptor));
258 self
259 }
260
261 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
262 self.config.keepalive = Some(keepalive);
263 self
264 }
265
266 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
267 self.config.connect_timeout = Some(timeout);
268 self
269 }
270
271 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
272 self.config.recovery_timeout = Some(timeout);
273 self
274 }
275
276 pub fn resumable(mut self) -> Self {
277 self.config.resumable = true;
278 self
279 }
280
281 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
282 self.config.operation_store = Some(operation_store);
283 self
284 }
285
286 #[cfg(not(target_arch = "wasm32"))]
289 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
290 self.config.spawn_fn = Box::new(f);
291 self
292 }
293
294 #[cfg(target_arch = "wasm32")]
297 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
298 self.config.spawn_fn = Box::new(f);
299 self
300 }
301
302 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
307 where
308 C: Conduit<Msg = MessageFamily> + 'static,
309 C::Tx: MaybeSend + MaybeSync + 'static,
310 C::Rx: MaybeSend + 'static,
311 {
312 let Self {
313 conduit,
314 mut handshake_result,
315 config,
316 recoverer,
317 } = self;
318 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
319 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
320 let (tx, rx) = conduit.split();
321 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
322 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
323 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
324 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
325 let acceptor: Arc<dyn ConnectionAcceptor> =
326 config.on_connection.unwrap_or_else(|| Arc::new(()));
327 let mut session = Session::pre_handshake(
328 tx,
329 rx,
330 Some(acceptor.clone()),
331 open_rx,
332 close_rx,
333 resume_rx,
334 control_tx.clone(),
335 control_rx,
336 config.keepalive,
337 config.resumable,
338 recoverer,
339 config.recovery_timeout,
340 );
341 let handle = session.establish_from_handshake(handshake_result)?;
342 let resume_key = session.resume_key();
343 let session_handle = SessionHandle {
344 open_tx,
345 close_tx,
346 resume_tx,
347 control_tx,
348 resume_key,
349 };
350 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
352 let pending = super::PendingConnection::with_caller_slot(
353 handle,
354 caller_slot.clone(),
355 config.operation_store,
356 );
357 peer_metadata.push(vox_types::MetadataEntry::str(
358 VOX_SERVICE_METADATA_KEY,
359 Client::SERVICE_NAME,
360 ));
361 let request = super::ConnectionRequest::new(&peer_metadata)?;
362 acceptor
363 .accept(&request, pending)
364 .map_err(SessionError::Rejected)?;
365 let caller =
366 caller_slot.lock().unwrap().take().expect(
367 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
368 );
369 let client = Client::from_vox_session(caller, Some(session_handle));
370 (config.spawn_fn)(Box::pin(async move { session.run().await }));
371 Ok(client)
372 }
373}
374
375pub struct SessionSourceInitiatorBuilder<'a, S> {
376 source: S,
377 mode: TransportMode,
378 config: SessionConfig<'a>,
379}
380
381impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
382 fn new(source: S, mode: TransportMode) -> Self {
383 let config = SessionConfig {
384 resumable: false,
385 ..SessionConfig::default()
386 };
387 Self {
388 source,
389 mode,
390 config,
391 }
392 }
393
394 pub fn parity(mut self, parity: Parity) -> Self {
395 self.config.root_settings.parity = parity;
396 self
397 }
398
399 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
400 self.config.root_settings = settings;
401 self
402 }
403
404 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
405 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
406 self
407 }
408
409 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
410 self.config.metadata = metadata;
411 self
412 }
413
414 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
415 self.config.on_connection = Some(Arc::new(acceptor));
416 self
417 }
418
419 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
420 self.config.keepalive = Some(keepalive);
421 self
422 }
423
424 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
425 self.config.connect_timeout = Some(timeout);
426 self
427 }
428
429 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
430 self.config.recovery_timeout = Some(timeout);
431 self
432 }
433
434 pub fn resumable(mut self) -> Self {
435 self.config.resumable = true;
436 self
437 }
438
439 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
440 self.config.operation_store = Some(operation_store);
441 self
442 }
443
444 #[cfg(not(target_arch = "wasm32"))]
445 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
446 self.config.spawn_fn = Box::new(f);
447 self
448 }
449
450 #[cfg(target_arch = "wasm32")]
451 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
452 self.config.spawn_fn = Box::new(f);
453 self
454 }
455
456 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
457 where
458 S: LinkSource,
459 S::Link: Link + MaybeSend + 'static,
460 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
461 <S::Link as Link>::Rx: MaybeSend + 'static,
462 {
463 let connect_timeout = self.config.connect_timeout;
464 let fut = self.establish_inner::<Client>();
465 match connect_timeout {
466 Some(timeout) => time::timeout(timeout, fut)
467 .await
468 .map_err(|_| SessionError::ConnectTimeout)?,
469 None => fut.await,
470 }
471 }
472
473 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
474 where
475 S: LinkSource,
476 S::Link: Link + MaybeSend + 'static,
477 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
478 <S::Link as Link>::Rx: MaybeSend + 'static,
479 {
480 let Self {
481 mut source,
482 mode,
483 mut config,
484 } = self;
485 inject_service_metadata::<Client>(&mut config.metadata);
486
487 match mode {
488 TransportMode::Bare => {
489 let attachment = source.next_link().await.map_err(SessionError::Io)?;
490 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
491 .await
492 .map_err(session_error_from_transport)?;
493 let handshake_result = handshake_as_initiator(
494 &link.tx,
495 &mut link.rx,
496 config.root_settings.clone(),
497 true,
498 None,
499 metadata_into_owned(config.metadata.clone()),
500 )
501 .await
502 .map_err(session_error_from_handshake)?;
503 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
504 .map_err(SessionError::Protocol)?;
505 let builder = SessionInitiatorBuilder::new(
506 BareConduit::with_message_plan(link, message_plan),
507 handshake_result,
508 );
509 let recoverer = Box::new(BareSourceRecoverer {
510 source,
511 settings: config.root_settings.clone(),
512 connect_timeout: config.connect_timeout,
513 metadata: metadata_into_owned(config.metadata.clone()),
514 });
515 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
516 builder,
517 config,
518 Some(recoverer),
519 )
520 .establish()
521 .await
522 }
523 TransportMode::Stable => {
524 let attachment = source.next_link().await.map_err(SessionError::Io)?;
525 let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
526 .await
527 .map_err(session_error_from_transport)?;
528 let handshake_result = handshake_as_initiator(
529 &link.tx,
530 &mut link.rx,
531 config.root_settings.clone(),
532 true,
533 None,
534 metadata_into_owned(config.metadata.clone()),
535 )
536 .await
537 .map_err(session_error_from_handshake)?;
538 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
539 .map_err(SessionError::Protocol)?;
540 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
541 link.tx,
542 link.rx,
543 None,
544 TransportedLinkSource {
545 source,
546 mode: TransportMode::Stable,
547 },
548 )
549 .await
550 .map_err(|error| {
551 SessionError::Protocol(format!("stable conduit setup failed: {error}"))
552 })?
553 .with_message_plan(message_plan);
554 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
555 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
556 builder, config, None,
557 )
558 .establish()
559 .await
560 }
561 }
562 }
563}
564
565pub struct SessionTransportInitiatorBuilder<'a, L> {
566 link: L,
567 mode: TransportMode,
568 config: SessionConfig<'a>,
569}
570
571impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
572 fn new(link: L, mode: TransportMode) -> Self {
573 let config = SessionConfig {
574 resumable: false,
575 ..SessionConfig::default()
576 };
577 Self { link, mode, config }
578 }
579
580 pub fn parity(mut self, parity: Parity) -> Self {
581 self.config.root_settings.parity = parity;
582 self
583 }
584
585 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
586 self.config.root_settings = settings;
587 self
588 }
589
590 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
591 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
592 self
593 }
594
595 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
596 self.config.metadata = metadata;
597 self
598 }
599
600 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
601 self.config.on_connection = Some(Arc::new(acceptor));
602 self
603 }
604
605 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
606 self.config.keepalive = Some(keepalive);
607 self
608 }
609
610 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
611 self.config.connect_timeout = Some(timeout);
612 self
613 }
614
615 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
616 self.config.recovery_timeout = Some(timeout);
617 self
618 }
619
620 pub fn resumable(mut self) -> Self {
621 self.config.resumable = true;
622 self
623 }
624
625 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
626 self.config.operation_store = Some(operation_store);
627 self
628 }
629
630 #[cfg(not(target_arch = "wasm32"))]
631 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
632 self.config.spawn_fn = Box::new(f);
633 self
634 }
635
636 #[cfg(target_arch = "wasm32")]
637 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
638 self.config.spawn_fn = Box::new(f);
639 self
640 }
641
642 #[cfg(not(target_arch = "wasm32"))]
643 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
644 where
645 L: Link + Send + 'static,
646 L::Tx: MaybeSend + MaybeSync + 'static,
647 L::Rx: MaybeSend + 'static,
648 {
649 let connect_timeout = self.config.connect_timeout;
650 let fut = self.establish_inner::<Client>();
651 match connect_timeout {
652 Some(timeout) => tokio::time::timeout(timeout, fut)
653 .await
654 .map_err(|_| SessionError::ConnectTimeout)?,
655 None => fut.await,
656 }
657 }
658
659 #[cfg(not(target_arch = "wasm32"))]
660 async fn establish_inner<Client: FromVoxSession>(self) -> Result<Client, SessionError>
661 where
662 L: Link + Send + 'static,
663 L::Tx: MaybeSend + MaybeSync + 'static,
664 L::Rx: MaybeSend + 'static,
665 {
666 let Self {
667 link,
668 mode,
669 mut config,
670 } = self;
671 inject_service_metadata::<Client>(&mut config.metadata);
672 match mode {
673 TransportMode::Bare => {
674 let link = initiate_transport(link, TransportMode::Bare)
675 .await
676 .map_err(session_error_from_transport)?;
677 Self::finish_with_bare_parts(link, config).await
678 }
679 TransportMode::Stable => {
680 let link = initiate_transport(link, TransportMode::Stable)
681 .await
682 .map_err(session_error_from_transport)?;
683 Self::finish_with_stable_parts(link, config).await
684 }
685 }
686 }
687
688 #[cfg(target_arch = "wasm32")]
689 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
690 where
691 L: Link + 'static,
692 L::Tx: MaybeSend + MaybeSync + 'static,
693 L::Rx: MaybeSend + 'static,
694 {
695 let Self {
696 link,
697 mode,
698 mut config,
699 } = self;
700 inject_service_metadata::<Client>(&mut config.metadata);
701 match mode {
702 TransportMode::Bare => {
703 let link = initiate_transport(link, TransportMode::Bare)
704 .await
705 .map_err(session_error_from_transport)?;
706 Self::finish_with_bare_parts(link, config).await
707 }
708 TransportMode::Stable => Err(SessionError::Protocol(
709 "stable conduit transport selection is unsupported on wasm".into(),
710 )),
711 }
712 }
713
714 async fn finish_with_bare_parts<Client: FromVoxSession>(
715 mut link: SplitLink<L::Tx, L::Rx>,
716 config: SessionConfig<'a>,
717 ) -> Result<Client, SessionError>
718 where
719 L: Link + 'static,
720 L::Tx: MaybeSend + MaybeSync + 'static,
721 L::Rx: MaybeSend + 'static,
722 {
723 let handshake_result = handshake_as_initiator(
724 &link.tx,
725 &mut link.rx,
726 config.root_settings.clone(),
727 true,
728 None,
729 metadata_into_owned(config.metadata.clone()),
730 )
731 .await
732 .map_err(session_error_from_handshake)?;
733 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
734 .map_err(SessionError::Protocol)?;
735 let builder = SessionInitiatorBuilder::new(
736 BareConduit::with_message_plan(link, message_plan),
737 handshake_result,
738 );
739 Self::apply_common_parts(builder, config, None)
740 .establish()
741 .await
742 }
743
744 #[cfg(not(target_arch = "wasm32"))]
745 async fn finish_with_stable_parts<Client: FromVoxSession>(
746 mut link: SplitLink<L::Tx, L::Rx>,
747 config: SessionConfig<'a>,
748 ) -> Result<Client, SessionError>
749 where
750 L: Link + Send + 'static,
751 L::Tx: MaybeSend + MaybeSync + Send + 'static,
752 L::Rx: MaybeSend + Send + 'static,
753 {
754 let handshake_result = handshake_as_initiator(
755 &link.tx,
756 &mut link.rx,
757 config.root_settings.clone(),
758 true,
759 None,
760 metadata_into_owned(config.metadata.clone()),
761 )
762 .await
763 .map_err(session_error_from_handshake)?;
764 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
765 .map_err(SessionError::Protocol)?;
766 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
767 link.tx,
768 link.rx,
769 None,
770 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
771 )
772 .await
773 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
774 .with_message_plan(message_plan);
775 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
776 Self::apply_common_parts(builder, config, None)
777 .establish()
778 .await
779 }
780
781 #[allow(clippy::too_many_arguments)]
782 fn apply_common_parts<C>(
783 mut builder: SessionInitiatorBuilder<'a, C>,
784 config: SessionConfig<'a>,
785 recoverer: Option<Box<dyn ConduitRecoverer>>,
786 ) -> SessionInitiatorBuilder<'a, C> {
787 builder.config = config;
788 builder.recoverer = recoverer;
789 builder
790 }
791}
792
793struct BareSourceRecoverer<S> {
794 source: S,
795 settings: ConnectionSettings,
796 connect_timeout: Option<Duration>,
797 metadata: Metadata<'static>,
798}
799
800const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
801const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
802
803impl<S> ConduitRecoverer for BareSourceRecoverer<S>
804where
805 S: LinkSource,
806 S::Link: Link + MaybeSend + 'static,
807 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
808 <S::Link as Link>::Rx: MaybeSend + 'static,
809{
810 fn next_conduit<'a>(
811 &'a mut self,
812 resume_key: Option<&'a SessionResumeKey>,
813 ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
814 Box::pin(async move {
815 let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
816 let mut use_resume_key = resume_key.is_some();
817
818 loop {
819 let selected_resume_key = if use_resume_key { resume_key } else { None };
820
821 let attempt = async {
822 let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
823 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
824 .await
825 .map_err(session_error_from_transport)?;
826 let handshake_result = handshake_as_initiator(
827 &link.tx,
828 &mut link.rx,
829 self.settings.clone(),
830 true,
831 selected_resume_key,
832 metadata_into_owned(self.metadata.clone()),
833 )
834 .await
835 .map_err(session_error_from_handshake)?;
836 let conduit = BareConduit::<MessageFamily, _>::new(link);
837 let (tx, rx) = conduit.split();
838 Ok(super::RecoveredConduit {
839 tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
840 rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
841 handshake: handshake_result,
842 })
843 };
844
845 let result = match self.connect_timeout {
846 Some(timeout) => match time::timeout(timeout, attempt).await {
847 Ok(r) => r,
848 Err(_) => Err(SessionError::ConnectTimeout),
849 },
850 None => attempt.await,
851 };
852
853 match result {
854 Ok(conduit) => return Ok(conduit),
855 Err(e) if !e.is_retryable() => return Err(e),
856 Err(_) => {}
857 }
858
859 if use_resume_key {
860 use_resume_key = false;
863 }
864
865 time::sleep(backoff).await;
866 backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
867 }
868 })
869 }
870}
871
872struct TransportedLinkSource<S> {
873 source: S,
874 mode: TransportMode,
875}
876
877impl<S> LinkSource for TransportedLinkSource<S>
878where
879 S: LinkSource,
880 S::Link: Link + MaybeSend + 'static,
881 <S::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
882 <S::Link as Link>::Rx: MaybeSend + 'static,
883{
884 type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
885
886 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
887 let attachment = self.source.next_link().await?;
888 let link = initiate_transport(attachment.into_link(), self.mode)
889 .await
890 .map_err(std::io::Error::other)?;
891 Ok(Attachment::initiator(link))
892 }
893}
894
895pub struct SessionAcceptorBuilder<'a, C> {
896 conduit: C,
897 handshake_result: HandshakeResult,
898 config: SessionConfig<'a>,
899}
900
901impl<'a, C> SessionAcceptorBuilder<'a, C> {
902 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
903 let root_settings = handshake_result.our_settings.clone();
904 let mut config = SessionConfig::with_settings(root_settings);
905 config.resumable = false;
907 Self {
908 conduit,
909 handshake_result,
910 config,
911 }
912 }
913
914 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
915 self.config.on_connection = Some(Arc::new(acceptor));
916 self
917 }
918
919 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
920 self.config.keepalive = Some(keepalive);
921 self
922 }
923
924 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
925 self.config.connect_timeout = Some(timeout);
926 self
927 }
928
929 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
930 self.config.recovery_timeout = Some(timeout);
931 self
932 }
933
934 pub fn resumable(mut self) -> Self {
935 self.config.resumable = true;
936 self
937 }
938
939 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
940 self.config.session_registry = Some(session_registry);
941 self
942 }
943
944 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
945 self.config.operation_store = Some(operation_store);
946 self
947 }
948
949 #[cfg(not(target_arch = "wasm32"))]
952 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
953 self.config.spawn_fn = Box::new(f);
954 self
955 }
956
957 #[cfg(target_arch = "wasm32")]
960 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
961 self.config.spawn_fn = Box::new(f);
962 self
963 }
964
965 #[moire::instrument]
966 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
967 where
968 C: Conduit<Msg = MessageFamily> + 'static,
969 C::Tx: MaybeSend + MaybeSync + 'static,
970 C::Rx: MaybeSend + 'static,
971 {
972 let Self {
973 conduit,
974 mut handshake_result,
975 config,
976 } = self;
977 validate_negotiated_root_settings(&config.root_settings, &handshake_result)?;
978 let mut peer_metadata = std::mem::take(&mut handshake_result.peer_metadata);
979 let (tx, rx) = conduit.split();
980 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
981 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
982 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
983 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
984 let acceptor: Arc<dyn ConnectionAcceptor> =
985 config.on_connection.unwrap_or_else(|| Arc::new(()));
986 let mut session = Session::pre_handshake(
987 tx,
988 rx,
989 Some(acceptor.clone()),
990 open_rx,
991 close_rx,
992 resume_rx,
993 control_tx.clone(),
994 control_rx,
995 config.keepalive,
996 config.resumable,
997 None,
998 config.recovery_timeout,
999 );
1000 let handle = session.establish_from_handshake(handshake_result)?;
1001 let resume_key = session.resume_key();
1002 let session_handle = SessionHandle {
1003 open_tx,
1004 close_tx,
1005 resume_tx,
1006 control_tx,
1007 resume_key,
1008 };
1009 if let (Some(registry), Some(key)) = (&config.session_registry, resume_key) {
1010 registry.insert(key, session_handle.clone());
1011 session.registered_in_registry = true;
1012 }
1013 let caller_slot = Arc::new(std::sync::Mutex::new(None::<crate::Caller>));
1015 let pending = super::PendingConnection::with_caller_slot(
1016 handle,
1017 caller_slot.clone(),
1018 config.operation_store,
1019 );
1020 peer_metadata.push(vox_types::MetadataEntry::str(
1021 VOX_SERVICE_METADATA_KEY,
1022 Client::SERVICE_NAME,
1023 ));
1024 let request = super::ConnectionRequest::new(&peer_metadata)?;
1025 acceptor
1026 .accept(&request, pending)
1027 .map_err(SessionError::Rejected)?;
1028 let caller =
1029 caller_slot.lock().unwrap().take().expect(
1030 "root connection acceptor must call handle_with (not into_handle or proxy_to)",
1031 );
1032 let client = Client::from_vox_session(caller, Some(session_handle));
1033 (config.spawn_fn)(Box::pin(async move { session.run().await }));
1034 Ok(client)
1035 }
1036
1037 #[moire::instrument]
1038 pub async fn establish_or_resume<Client: FromVoxSession>(
1039 self,
1040 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1041 where
1042 C: Conduit<Msg = MessageFamily> + 'static,
1043 C::Tx: MaybeSend + MaybeSync + 'static,
1044 C::Rx: MaybeSend + 'static,
1045 {
1046 if let (Some(registry), Some(resume_key)) = (
1050 &self.config.session_registry,
1051 self.handshake_result.peer_resume_key,
1052 ) && let Some(handle) = registry.get(&resume_key)
1053 {
1054 let (tx, rx) = self.conduit.split();
1055 if let Err(error) = handle
1056 .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1057 .await
1058 {
1059 registry.remove(&resume_key);
1060 return Err(error);
1061 }
1062 return Ok(SessionAcceptOutcome::Resumed);
1063 }
1064
1065 let client = self.establish().await?;
1066 Ok(SessionAcceptOutcome::Established(client))
1067 }
1068}
1069
1070pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1071 link: L,
1072 config: SessionConfig<'a>,
1073}
1074
1075impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1076 fn new(link: L) -> Self {
1077 Self {
1078 link,
1079 config: SessionConfig::with_settings(ConnectionSettings {
1080 parity: Parity::Even,
1081 max_concurrent_requests: 64,
1082 }),
1083 }
1084 }
1085
1086 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1087 self.config.root_settings = settings;
1088 self
1089 }
1090
1091 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1092 self.config.root_settings.max_concurrent_requests = max_concurrent_requests;
1093 self
1094 }
1095
1096 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1097 self.config.metadata = metadata;
1098 self
1099 }
1100
1101 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1102 self.config.on_connection = Some(Arc::new(acceptor));
1103 self
1104 }
1105
1106 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1107 self.config.keepalive = Some(keepalive);
1108 self
1109 }
1110
1111 pub fn connect_timeout(mut self, timeout: std::time::Duration) -> Self {
1112 self.config.connect_timeout = Some(timeout);
1113 self
1114 }
1115
1116 pub fn recovery_timeout(mut self, timeout: std::time::Duration) -> Self {
1117 self.config.recovery_timeout = Some(timeout);
1118 self
1119 }
1120
1121 pub fn resumable(mut self) -> Self {
1122 self.config.resumable = true;
1123 self
1124 }
1125
1126 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1127 self.config.session_registry = Some(session_registry);
1128 self
1129 }
1130
1131 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1132 self.config.operation_store = Some(operation_store);
1133 self
1134 }
1135
1136 #[cfg(not(target_arch = "wasm32"))]
1137 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1138 self.config.spawn_fn = Box::new(f);
1139 self
1140 }
1141
1142 #[cfg(target_arch = "wasm32")]
1143 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1144 self.config.spawn_fn = Box::new(f);
1145 self
1146 }
1147
1148 #[moire::instrument]
1149 pub async fn establish<Client: FromVoxSession>(self) -> Result<Client, SessionError>
1150 where
1151 L: Link + MaybeSend + 'static,
1152 L::Tx: MaybeSend + MaybeSync + 'static,
1153 L::Rx: MaybeSend + 'static,
1154 {
1155 let Self { link, mut config } = self;
1156 inject_service_metadata::<Client>(&mut config.metadata);
1157 let (mode, mut link) = accept_transport(link)
1158 .await
1159 .map_err(session_error_from_transport)?;
1160 match mode {
1161 TransportMode::Bare => {
1162 let handshake_result = handshake_as_acceptor(
1163 &link.tx,
1164 &mut link.rx,
1165 config.root_settings.clone(),
1166 true,
1167 config.resumable,
1168 None,
1169 metadata_into_owned(config.metadata.clone()),
1170 )
1171 .await
1172 .map_err(session_error_from_handshake)?;
1173 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1174 .map_err(SessionError::Protocol)?;
1175 let builder = SessionAcceptorBuilder::new(
1176 BareConduit::with_message_plan(link, message_plan),
1177 handshake_result,
1178 );
1179 Self::apply_common_parts(builder, config).establish().await
1180 }
1181 TransportMode::Stable => Self::finish_with_stable_parts(link, config).await,
1182 }
1183 }
1184
1185 #[moire::instrument]
1186 pub async fn establish_or_resume<Client: FromVoxSession>(
1187 self,
1188 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1189 where
1190 L: Link + MaybeSend + 'static,
1191 L::Tx: MaybeSend + MaybeSync + 'static,
1192 L::Rx: MaybeSend + 'static,
1193 {
1194 let Self { link, config } = self;
1195 let (mode, mut link) = accept_transport(link)
1196 .await
1197 .map_err(session_error_from_transport)?;
1198 match mode {
1199 TransportMode::Bare => {
1200 let handshake_result = handshake_as_acceptor(
1201 &link.tx,
1202 &mut link.rx,
1203 config.root_settings.clone(),
1204 true,
1205 config.resumable,
1206 None,
1207 metadata_into_owned(config.metadata.clone()),
1208 )
1209 .await
1210 .map_err(session_error_from_handshake)?;
1211 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1212 .map_err(SessionError::Protocol)?;
1213 let builder = SessionAcceptorBuilder::new(
1214 BareConduit::with_message_plan(link, message_plan),
1215 handshake_result,
1216 );
1217 Self::apply_common_parts(builder, config)
1218 .establish_or_resume()
1219 .await
1220 }
1221 TransportMode::Stable => Self::finish_with_stable_parts(link, config)
1222 .await
1223 .map(SessionAcceptOutcome::Established),
1224 }
1225 }
1226
1227 async fn finish_with_stable_parts<Client: FromVoxSession>(
1228 mut link: SplitLink<L::Tx, L::Rx>,
1229 config: SessionConfig<'a>,
1230 ) -> Result<Client, SessionError>
1231 where
1232 L: Link + MaybeSend + 'static,
1233 L::Tx: MaybeSend + MaybeSync + 'static,
1234 L::Rx: MaybeSend + 'static,
1235 {
1236 let handshake_result = handshake_as_acceptor(
1237 &link.tx,
1238 &mut link.rx,
1239 config.root_settings.clone(),
1240 true,
1241 config.resumable,
1242 None,
1243 metadata_into_owned(config.metadata.clone()),
1244 )
1245 .await
1246 .map_err(session_error_from_handshake)?;
1247 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1248 .map_err(SessionError::Protocol)?;
1249 let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1250 .await
1251 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1252 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1253 link.tx,
1254 link.rx,
1255 Some(client_hello),
1256 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1257 )
1258 .await
1259 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1260 .with_message_plan(message_plan);
1261 let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1262 Self::apply_common_parts(builder, config).establish().await
1263 }
1264
1265 fn apply_common_parts<C>(
1266 mut builder: SessionAcceptorBuilder<'a, C>,
1267 config: SessionConfig<'a>,
1268 ) -> SessionAcceptorBuilder<'a, C> {
1269 builder.config = config;
1270 builder
1271 }
1272}
1273
1274fn validate_negotiated_root_settings(
1275 expected_root_settings: &ConnectionSettings,
1276 handshake_result: &HandshakeResult,
1277) -> Result<(), SessionError> {
1278 if handshake_result.our_settings != *expected_root_settings {
1279 return Err(SessionError::Protocol(
1280 "negotiated root settings do not match builder settings".into(),
1281 ));
1282 }
1283 Ok(())
1284}
1285
1286fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1287 match error {
1288 crate::HandshakeError::Io(io) => SessionError::Io(io),
1289 crate::HandshakeError::PeerClosed => {
1290 SessionError::Protocol("peer closed during handshake".into())
1291 }
1292 crate::HandshakeError::NotResumable => SessionError::NotResumable,
1293 other => SessionError::Protocol(other.to_string()),
1294 }
1295}
1296
1297fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1298 match error {
1299 crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1300 crate::TransportPrologueError::LinkDead => {
1301 SessionError::Protocol("link closed during transport prologue".into())
1302 }
1303 crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1304 crate::TransportPrologueError::Rejected(reason) => {
1305 SessionError::Protocol(format!("transport rejected: {reason}"))
1306 }
1307 }
1308}