1use std::{
2 collections::HashMap,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use moire::sync::mpsc;
10use vox_types::{
11 Conduit, ConduitTx, ConnectionSettings, Handler, HandshakeResult, Link, MaybeSend, MaybeSync,
12 MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink,
13};
14
15#[cfg(not(target_arch = "wasm32"))]
16use crate::{Attachment, LinkSource, StableConduit};
17use crate::{
18 BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
19 handshake_as_acceptor, handshake_as_initiator, initiate_transport,
20};
21
22use super::{
23 CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
24 SessionHandle, SessionKeepaliveConfig,
25};
26use crate::{Driver, DriverCaller, DriverReplySink};
27
28#[cfg(not(target_arch = "wasm32"))]
31pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
32#[cfg(target_arch = "wasm32")]
33pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
34
35#[cfg(not(target_arch = "wasm32"))]
36type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
37#[cfg(target_arch = "wasm32")]
38type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
39
40#[cfg(not(target_arch = "wasm32"))]
41fn default_spawn_fn() -> SpawnFn {
42 Box::new(|fut| {
43 tokio::spawn(fut);
44 })
45}
46
47#[cfg(target_arch = "wasm32")]
48fn default_spawn_fn() -> SpawnFn {
49 Box::new(|fut| {
50 wasm_bindgen_futures::spawn_local(fut);
51 })
52}
53
54pub fn initiator_conduit<I: IntoConduit>(
57 into_conduit: I,
58 handshake_result: HandshakeResult,
59) -> SessionInitiatorBuilder<'static, I::Conduit> {
60 SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
61}
62
63#[cfg(not(target_arch = "wasm32"))]
64pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
65where
66 S: LinkSource,
67{
68 SessionSourceInitiatorBuilder::new(source, mode)
69}
70
71pub fn acceptor_conduit<I: IntoConduit>(
72 into_conduit: I,
73 handshake_result: HandshakeResult,
74) -> SessionAcceptorBuilder<'static, I::Conduit> {
75 SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
76}
77
78pub async fn initiator_on_link<L: Link>(
81 link: L,
82 settings: ConnectionSettings,
83) -> Result<
84 SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
85 SessionError,
86>
87where
88 L::Tx: MaybeSend + MaybeSync + 'static,
89 L::Rx: MaybeSend + 'static,
90{
91 let (tx, mut rx) = link.split();
92 let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None)
93 .await
94 .map_err(session_error_from_handshake)?;
95 let message_plan =
96 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
97 Ok(SessionInitiatorBuilder::new(
98 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
99 handshake_result,
100 ))
101}
102
103pub async fn acceptor_on_link<L: Link>(
106 link: L,
107 settings: ConnectionSettings,
108) -> Result<
109 SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
110 SessionError,
111>
112where
113 L::Tx: MaybeSend + MaybeSync + 'static,
114 L::Rx: MaybeSend + 'static,
115{
116 let (tx, mut rx) = link.split();
117 let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None)
118 .await
119 .map_err(session_error_from_handshake)?;
120 let message_plan =
121 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
122 Ok(SessionAcceptorBuilder::new(
123 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
124 handshake_result,
125 ))
126}
127
128pub fn initiator_on<L: Link>(
129 link: L,
130 mode: TransportMode,
131) -> SessionTransportInitiatorBuilder<'static, L> {
132 SessionTransportInitiatorBuilder::new(link, mode)
133}
134
135pub fn initiator_transport<L: Link>(
136 link: L,
137 mode: TransportMode,
138) -> SessionTransportInitiatorBuilder<'static, L> {
139 initiator_on(link, mode)
140}
141
142pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
143 SessionTransportAcceptorBuilder::new(link)
144}
145
146pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
147 acceptor_on(link)
148}
149
150#[derive(Clone, Default)]
151pub struct SessionRegistry {
152 inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
153}
154
155impl SessionRegistry {
156 fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
157 self.inner
158 .lock()
159 .expect("session registry poisoned")
160 .get(key)
161 .cloned()
162 }
163
164 fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
165 self.inner
166 .lock()
167 .expect("session registry poisoned")
168 .insert(key, handle);
169 }
170
171 fn remove(&self, key: &SessionResumeKey) {
172 self.inner
173 .lock()
174 .expect("session registry poisoned")
175 .remove(key);
176 }
177}
178
179pub enum SessionAcceptOutcome<Client> {
180 Established(Client, SessionHandle),
181 Resumed,
182}
183
184pub struct SessionInitiatorBuilder<'a, C> {
185 conduit: C,
186 handshake_result: HandshakeResult,
187 root_settings: ConnectionSettings,
188 metadata: Metadata<'a>,
189 on_connection: Option<Box<dyn ConnectionAcceptor>>,
190 keepalive: Option<SessionKeepaliveConfig>,
191 resumable: bool,
192 recoverer: Option<Box<dyn ConduitRecoverer>>,
193 operation_store: Option<Arc<dyn OperationStore>>,
194 spawn_fn: SpawnFn,
195}
196
197impl<'a, C> SessionInitiatorBuilder<'a, C> {
198 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
199 let root_settings = handshake_result.our_settings.clone();
200 Self {
201 conduit,
202 handshake_result,
203 root_settings,
204 metadata: vec![],
205 on_connection: None,
206 keepalive: None,
207 resumable: false,
208 recoverer: None,
209 operation_store: None,
210 spawn_fn: default_spawn_fn(),
211 }
212 }
213
214 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
215 self.on_connection = Some(Box::new(acceptor));
216 self
217 }
218
219 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
220 self.keepalive = Some(keepalive);
221 self
222 }
223
224 pub fn resumable(mut self) -> Self {
225 self.resumable = true;
226 self
227 }
228
229 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
230 self.operation_store = Some(operation_store);
231 self
232 }
233
234 #[cfg(not(target_arch = "wasm32"))]
237 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
238 self.spawn_fn = Box::new(f);
239 self
240 }
241
242 #[cfg(target_arch = "wasm32")]
245 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
246 self.spawn_fn = Box::new(f);
247 self
248 }
249
250 pub async fn establish<Client: From<DriverCaller>>(
251 self,
252 handler: impl Handler<DriverReplySink> + 'static,
253 ) -> Result<(Client, SessionHandle), SessionError>
254 where
255 C: Conduit<Msg = MessageFamily> + 'static,
256 C::Tx: MaybeSend + MaybeSync + 'static,
257 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
258 C::Rx: MaybeSend + 'static,
259 {
260 let Self {
261 conduit,
262 handshake_result,
263 root_settings,
264 metadata: _metadata,
265 on_connection,
266 keepalive,
267 resumable,
268 recoverer,
269 operation_store,
270 spawn_fn,
271 } = self;
272 validate_negotiated_root_settings(&root_settings, &handshake_result)?;
273 let (tx, rx) = conduit.split();
274 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
275 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
276 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
277 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
278 let mut session = Session::pre_handshake(
279 tx,
280 rx,
281 on_connection,
282 open_rx,
283 close_rx,
284 resume_rx,
285 control_tx.clone(),
286 control_rx,
287 keepalive,
288 resumable,
289 recoverer,
290 );
291 let handle = session.establish_from_handshake(handshake_result)?;
292 let resume_key = session.resume_key();
293 let session_handle = SessionHandle {
294 open_tx,
295 close_tx,
296 resume_tx,
297 control_tx,
298 resume_key,
299 };
300 let mut driver = match operation_store {
301 Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
302 None => Driver::new(handle, handler),
303 };
304 let client = Client::from(driver.caller());
305 (spawn_fn)(Box::pin(async move { session.run().await }));
306 #[cfg(not(target_arch = "wasm32"))]
307 tokio::spawn(async move { driver.run().await });
308 #[cfg(target_arch = "wasm32")]
309 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
310 Ok((client, session_handle))
311 }
312}
313
314#[cfg(not(target_arch = "wasm32"))]
315pub struct SessionSourceInitiatorBuilder<'a, S> {
316 source: S,
317 mode: TransportMode,
318 root_settings: ConnectionSettings,
319 metadata: Metadata<'a>,
320 on_connection: Option<Box<dyn ConnectionAcceptor>>,
321 keepalive: Option<SessionKeepaliveConfig>,
322 resumable: bool,
323 operation_store: Option<Arc<dyn OperationStore>>,
324 spawn_fn: SpawnFn,
325}
326
327#[cfg(not(target_arch = "wasm32"))]
328impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
329 fn new(source: S, mode: TransportMode) -> Self {
330 Self {
331 source,
332 mode,
333 root_settings: ConnectionSettings {
334 parity: Parity::Odd,
335 max_concurrent_requests: 64,
336 },
337 metadata: vec![],
338 on_connection: None,
339 keepalive: None,
340 resumable: true,
341 operation_store: None,
342 spawn_fn: default_spawn_fn(),
343 }
344 }
345
346 pub fn parity(mut self, parity: Parity) -> Self {
347 self.root_settings.parity = parity;
348 self
349 }
350
351 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
352 self.root_settings = settings;
353 self
354 }
355
356 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
357 self.root_settings.max_concurrent_requests = max_concurrent_requests;
358 self
359 }
360
361 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
362 self.metadata = metadata;
363 self
364 }
365
366 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
367 self.on_connection = Some(Box::new(acceptor));
368 self
369 }
370
371 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
372 self.keepalive = Some(keepalive);
373 self
374 }
375
376 pub fn resumable(mut self) -> Self {
377 self.resumable = true;
378 self
379 }
380
381 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
382 self.operation_store = Some(operation_store);
383 self
384 }
385
386 #[cfg(not(target_arch = "wasm32"))]
387 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
388 self.spawn_fn = Box::new(f);
389 self
390 }
391
392 #[cfg(target_arch = "wasm32")]
393 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
394 self.spawn_fn = Box::new(f);
395 self
396 }
397
398 pub async fn establish<Client: From<DriverCaller>>(
399 self,
400 handler: impl Handler<DriverReplySink> + 'static,
401 ) -> Result<(Client, SessionHandle), SessionError>
402 where
403 S: LinkSource,
404 S::Link: Link + Send + 'static,
405 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
406 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
407 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
408 {
409 let Self {
410 mut source,
411 mode,
412 root_settings,
413 metadata,
414 on_connection,
415 keepalive,
416 resumable,
417 operation_store,
418 spawn_fn,
419 } = self;
420
421 match mode {
422 TransportMode::Bare => {
423 let attachment = source.next_link().await.map_err(SessionError::Io)?;
424 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
425 .await
426 .map_err(session_error_from_transport)?;
427 let handshake_result = handshake_as_initiator(
428 &link.tx,
429 &mut link.rx,
430 root_settings.clone(),
431 true,
432 None,
433 )
434 .await
435 .map_err(session_error_from_handshake)?;
436 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
437 .map_err(SessionError::Protocol)?;
438 let builder = SessionInitiatorBuilder::new(
439 BareConduit::with_message_plan(link, message_plan),
440 handshake_result,
441 );
442 let recoverer = Box::new(BareSourceRecoverer {
443 source,
444 settings: root_settings.clone(),
445 });
446 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
447 builder,
448 root_settings,
449 metadata,
450 on_connection,
451 keepalive,
452 resumable,
453 Some(recoverer),
454 operation_store,
455 spawn_fn,
456 )
457 .establish(handler)
458 .await
459 }
460 TransportMode::Stable => {
461 let attachment = source.next_link().await.map_err(SessionError::Io)?;
464 let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
465 .await
466 .map_err(session_error_from_transport)?;
467 let handshake_result = handshake_as_initiator(
468 &link.tx,
469 &mut link.rx,
470 root_settings.clone(),
471 true,
472 None,
473 )
474 .await
475 .map_err(session_error_from_handshake)?;
476 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
477 .map_err(SessionError::Protocol)?;
478 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
479 link.tx,
480 link.rx,
481 None, TransportedLinkSource {
483 source,
484 mode: TransportMode::Stable,
485 },
486 )
487 .await
488 .map_err(|error| {
489 SessionError::Protocol(format!("stable conduit setup failed: {error}"))
490 })?
491 .with_message_plan(message_plan);
492 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
493 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
494 builder,
495 root_settings,
496 metadata,
497 on_connection,
498 keepalive,
499 resumable,
500 None,
501 operation_store,
502 spawn_fn,
503 )
504 .establish(handler)
505 .await
506 }
507 }
508 }
509}
510
511pub struct SessionTransportInitiatorBuilder<'a, L> {
512 link: L,
513 mode: TransportMode,
514 root_settings: ConnectionSettings,
515 metadata: Metadata<'a>,
516 on_connection: Option<Box<dyn ConnectionAcceptor>>,
517 keepalive: Option<SessionKeepaliveConfig>,
518 resumable: bool,
519 operation_store: Option<Arc<dyn OperationStore>>,
520 spawn_fn: SpawnFn,
521}
522
523impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
524 fn new(link: L, mode: TransportMode) -> Self {
525 Self {
526 link,
527 mode,
528 root_settings: ConnectionSettings {
529 parity: Parity::Odd,
530 max_concurrent_requests: 64,
531 },
532 metadata: vec![],
533 on_connection: None,
534 keepalive: None,
535 resumable: false,
536 operation_store: None,
537 spawn_fn: default_spawn_fn(),
538 }
539 }
540
541 pub fn parity(mut self, parity: Parity) -> Self {
542 self.root_settings.parity = parity;
543 self
544 }
545
546 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
547 self.root_settings = settings;
548 self
549 }
550
551 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
552 self.root_settings.max_concurrent_requests = max_concurrent_requests;
553 self
554 }
555
556 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
557 self.metadata = metadata;
558 self
559 }
560
561 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
562 self.on_connection = Some(Box::new(acceptor));
563 self
564 }
565
566 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
567 self.keepalive = Some(keepalive);
568 self
569 }
570
571 pub fn resumable(mut self) -> Self {
572 self.resumable = true;
573 self
574 }
575
576 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
577 self.operation_store = Some(operation_store);
578 self
579 }
580
581 #[cfg(not(target_arch = "wasm32"))]
582 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
583 self.spawn_fn = Box::new(f);
584 self
585 }
586
587 #[cfg(target_arch = "wasm32")]
588 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
589 self.spawn_fn = Box::new(f);
590 self
591 }
592
593 #[cfg(not(target_arch = "wasm32"))]
594 pub async fn establish<Client: From<DriverCaller>>(
595 self,
596 handler: impl Handler<DriverReplySink> + 'static,
597 ) -> Result<(Client, SessionHandle), SessionError>
598 where
599 L: Link + Send + 'static,
600 L::Tx: MaybeSend + MaybeSync + 'static,
601 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
602 L::Rx: MaybeSend + 'static,
603 {
604 let Self {
605 link,
606 mode,
607 root_settings,
608 metadata,
609 on_connection,
610 keepalive,
611 resumable,
612 operation_store,
613 spawn_fn,
614 } = self;
615 match mode {
616 TransportMode::Bare => {
617 let link = initiate_transport(link, TransportMode::Bare)
618 .await
619 .map_err(session_error_from_transport)?;
620 Self::finish_with_bare_parts(
621 link,
622 root_settings,
623 metadata,
624 on_connection,
625 keepalive,
626 resumable,
627 operation_store,
628 spawn_fn,
629 handler,
630 )
631 .await
632 }
633 TransportMode::Stable => {
634 let link = initiate_transport(link, TransportMode::Stable)
635 .await
636 .map_err(session_error_from_transport)?;
637 Self::finish_with_stable_parts(
638 link,
639 root_settings,
640 metadata,
641 on_connection,
642 keepalive,
643 resumable,
644 operation_store,
645 spawn_fn,
646 handler,
647 )
648 .await
649 }
650 }
651 }
652
653 #[cfg(target_arch = "wasm32")]
654 pub async fn establish<Client: From<DriverCaller>>(
655 self,
656 handler: impl Handler<DriverReplySink> + 'static,
657 ) -> Result<(Client, SessionHandle), SessionError>
658 where
659 L: Link + 'static,
660 L::Tx: MaybeSend + MaybeSync + 'static,
661 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
662 L::Rx: MaybeSend + 'static,
663 {
664 let Self {
665 link,
666 mode,
667 root_settings,
668 metadata,
669 on_connection,
670 keepalive,
671 resumable,
672 operation_store,
673 spawn_fn,
674 } = self;
675 match mode {
676 TransportMode::Bare => {
677 let link = initiate_transport(link, TransportMode::Bare)
678 .await
679 .map_err(session_error_from_transport)?;
680 Self::finish_with_bare_parts(
681 link,
682 root_settings,
683 metadata,
684 on_connection,
685 keepalive,
686 resumable,
687 operation_store,
688 spawn_fn,
689 handler,
690 )
691 .await
692 }
693 TransportMode::Stable => Err(SessionError::Protocol(
694 "stable conduit transport selection is unsupported on wasm".into(),
695 )),
696 }
697 }
698
699 #[allow(clippy::too_many_arguments)]
700 async fn finish_with_bare_parts<Client: From<DriverCaller>>(
701 mut link: SplitLink<L::Tx, L::Rx>,
702 root_settings: ConnectionSettings,
703 metadata: Metadata<'a>,
704 on_connection: Option<Box<dyn ConnectionAcceptor>>,
705 keepalive: Option<SessionKeepaliveConfig>,
706 resumable: bool,
707 operation_store: Option<Arc<dyn OperationStore>>,
708 spawn_fn: SpawnFn,
709 handler: impl Handler<DriverReplySink> + 'static,
710 ) -> Result<(Client, SessionHandle), SessionError>
711 where
712 L: Link + 'static,
713 L::Tx: MaybeSend + MaybeSync + 'static,
714 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
715 L::Rx: MaybeSend + 'static,
716 {
717 let handshake_result =
718 handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
719 .await
720 .map_err(session_error_from_handshake)?;
721 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
722 .map_err(SessionError::Protocol)?;
723 let builder = SessionInitiatorBuilder::new(
724 BareConduit::with_message_plan(link, message_plan),
725 handshake_result,
726 );
727 Self::apply_common_parts(
728 builder,
729 root_settings,
730 metadata,
731 on_connection,
732 keepalive,
733 resumable,
734 None,
735 operation_store,
736 spawn_fn,
737 )
738 .establish(handler)
739 .await
740 }
741
742 #[cfg(not(target_arch = "wasm32"))]
743 #[allow(clippy::too_many_arguments)]
744 async fn finish_with_stable_parts<Client: From<DriverCaller>>(
745 mut link: SplitLink<L::Tx, L::Rx>,
746 root_settings: ConnectionSettings,
747 metadata: Metadata<'a>,
748 on_connection: Option<Box<dyn ConnectionAcceptor>>,
749 keepalive: Option<SessionKeepaliveConfig>,
750 resumable: bool,
751 operation_store: Option<Arc<dyn OperationStore>>,
752 spawn_fn: SpawnFn,
753 handler: impl Handler<DriverReplySink> + 'static,
754 ) -> Result<(Client, SessionHandle), SessionError>
755 where
756 L: Link + Send + 'static,
757 L::Tx: MaybeSend + MaybeSync + Send + 'static,
758 for<'p> <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
759 L::Rx: MaybeSend + Send + 'static,
760 {
761 let handshake_result =
762 handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
763 .await
764 .map_err(session_error_from_handshake)?;
765 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
766 .map_err(SessionError::Protocol)?;
767 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
768 link.tx,
769 link.rx,
770 None,
771 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
772 )
773 .await
774 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
775 .with_message_plan(message_plan);
776 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
777 Self::apply_common_parts(
778 builder,
779 root_settings,
780 metadata,
781 on_connection,
782 keepalive,
783 resumable,
784 None,
785 operation_store,
786 spawn_fn,
787 )
788 .establish(handler)
789 .await
790 }
791
792 #[allow(clippy::too_many_arguments)]
793 #[allow(clippy::too_many_arguments)]
794 fn apply_common_parts<C>(
795 mut builder: SessionInitiatorBuilder<'a, C>,
796 root_settings: ConnectionSettings,
797 metadata: Metadata<'a>,
798 on_connection: Option<Box<dyn ConnectionAcceptor>>,
799 keepalive: Option<SessionKeepaliveConfig>,
800 resumable: bool,
801 recoverer: Option<Box<dyn ConduitRecoverer>>,
802 operation_store: Option<Arc<dyn OperationStore>>,
803 spawn_fn: SpawnFn,
804 ) -> SessionInitiatorBuilder<'a, C> {
805 builder.root_settings = root_settings;
806 builder.metadata = metadata;
807 builder.on_connection = on_connection;
808 builder.keepalive = keepalive;
809 builder.resumable = resumable;
810 builder.recoverer = recoverer;
811 builder.operation_store = operation_store;
812 builder.spawn_fn = spawn_fn;
813 builder
814 }
815}
816
817#[cfg(not(target_arch = "wasm32"))]
818struct BareSourceRecoverer<S> {
819 source: S,
820 settings: ConnectionSettings,
821}
822
823#[cfg(not(target_arch = "wasm32"))]
824const SOURCE_RECOVERY_BACKOFF_MIN: Duration = Duration::from_millis(100);
825#[cfg(not(target_arch = "wasm32"))]
826const SOURCE_RECOVERY_BACKOFF_MAX: Duration = Duration::from_secs(5);
827
828#[cfg(not(target_arch = "wasm32"))]
829impl<S> ConduitRecoverer for BareSourceRecoverer<S>
830where
831 S: LinkSource,
832 S::Link: Link + Send + 'static,
833 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
834 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
835 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
836{
837 fn next_conduit<'a>(
838 &'a mut self,
839 resume_key: Option<&'a SessionResumeKey>,
840 ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
841 Box::pin(async move {
842 let mut backoff = SOURCE_RECOVERY_BACKOFF_MIN;
843 let mut use_resume_key = resume_key.is_some();
844
845 loop {
846 let selected_resume_key = if use_resume_key { resume_key } else { None };
847
848 let result: Result<super::RecoveredConduit, SessionError> = async {
849 let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
850 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
851 .await
852 .map_err(session_error_from_transport)?;
853 let handshake_result = handshake_as_initiator(
854 &link.tx,
855 &mut link.rx,
856 self.settings.clone(),
857 true,
858 selected_resume_key,
859 )
860 .await
861 .map_err(session_error_from_handshake)?;
862 let conduit = BareConduit::<MessageFamily, _>::new(link);
863 let (tx, rx) = conduit.split();
864 Ok(super::RecoveredConduit {
865 tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
866 rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
867 handshake: handshake_result,
868 })
869 }
870 .await;
871
872 if let Ok(conduit) = result {
873 return Ok(conduit);
874 }
875
876 if use_resume_key {
877 use_resume_key = false;
880 }
881
882 tokio::time::sleep(backoff).await;
883 backoff = backoff.saturating_mul(2).min(SOURCE_RECOVERY_BACKOFF_MAX);
884 }
885 })
886 }
887}
888
889#[cfg(not(target_arch = "wasm32"))]
890struct TransportedLinkSource<S> {
891 source: S,
892 mode: TransportMode,
893}
894
895#[cfg(not(target_arch = "wasm32"))]
896impl<S> LinkSource for TransportedLinkSource<S>
897where
898 S: LinkSource,
899 S::Link: Link + Send + 'static,
900 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
901 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
902 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
903{
904 type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
905
906 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
907 let attachment = self.source.next_link().await?;
908 let link = initiate_transport(attachment.into_link(), self.mode)
909 .await
910 .map_err(std::io::Error::other)?;
911 Ok(Attachment::initiator(link))
912 }
913}
914
915pub struct SessionAcceptorBuilder<'a, C> {
916 conduit: C,
917 handshake_result: HandshakeResult,
918 root_settings: ConnectionSettings,
919 metadata: Metadata<'a>,
920 on_connection: Option<Box<dyn ConnectionAcceptor>>,
921 keepalive: Option<SessionKeepaliveConfig>,
922 resumable: bool,
923 session_registry: Option<SessionRegistry>,
924 operation_store: Option<Arc<dyn OperationStore>>,
925 spawn_fn: SpawnFn,
926}
927
928impl<'a, C> SessionAcceptorBuilder<'a, C> {
929 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
930 let root_settings = handshake_result.our_settings.clone();
931 Self {
932 conduit,
933 handshake_result,
934 root_settings,
935 metadata: vec![],
936 on_connection: None,
937 keepalive: None,
938 resumable: false,
939 session_registry: None,
940 operation_store: None,
941 spawn_fn: default_spawn_fn(),
942 }
943 }
944
945 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
946 self.on_connection = Some(Box::new(acceptor));
947 self
948 }
949
950 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
951 self.keepalive = Some(keepalive);
952 self
953 }
954
955 pub fn resumable(mut self) -> Self {
956 self.resumable = true;
957 self
958 }
959
960 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
961 self.session_registry = Some(session_registry);
962 self
963 }
964
965 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
966 self.operation_store = Some(operation_store);
967 self
968 }
969
970 #[cfg(not(target_arch = "wasm32"))]
973 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
974 self.spawn_fn = Box::new(f);
975 self
976 }
977
978 #[cfg(target_arch = "wasm32")]
981 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
982 self.spawn_fn = Box::new(f);
983 self
984 }
985
986 #[moire::instrument]
987 pub async fn establish<Client: From<DriverCaller>>(
988 self,
989 handler: impl Handler<DriverReplySink> + 'static,
990 ) -> Result<(Client, SessionHandle), SessionError>
991 where
992 C: Conduit<Msg = MessageFamily> + 'static,
993 C::Tx: MaybeSend + MaybeSync + 'static,
994 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
995 C::Rx: MaybeSend + 'static,
996 {
997 let Self {
998 conduit,
999 handshake_result,
1000 root_settings,
1001 metadata: _metadata,
1002 on_connection,
1003 keepalive,
1004 resumable,
1005 session_registry,
1006 operation_store,
1007 spawn_fn,
1008 } = self;
1009 validate_negotiated_root_settings(&root_settings, &handshake_result)?;
1010 let (tx, rx) = conduit.split();
1011 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
1012 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
1013 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
1014 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
1015 let mut session = Session::pre_handshake(
1016 tx,
1017 rx,
1018 on_connection,
1019 open_rx,
1020 close_rx,
1021 resume_rx,
1022 control_tx.clone(),
1023 control_rx,
1024 keepalive,
1025 resumable,
1026 None,
1027 );
1028 let handle = session.establish_from_handshake(handshake_result)?;
1029 let resume_key = session.resume_key();
1030 let session_handle = SessionHandle {
1031 open_tx,
1032 close_tx,
1033 resume_tx,
1034 control_tx,
1035 resume_key,
1036 };
1037 if let (Some(registry), Some(key)) = (&session_registry, resume_key) {
1038 registry.insert(key, session_handle.clone());
1039 }
1040 let mut driver = match operation_store {
1041 Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
1042 None => Driver::new(handle, handler),
1043 };
1044 let client = Client::from(driver.caller());
1045 (spawn_fn)(Box::pin(async move { session.run().await }));
1046 #[cfg(not(target_arch = "wasm32"))]
1047 tokio::spawn(async move { driver.run().await });
1048 #[cfg(target_arch = "wasm32")]
1049 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
1050 Ok((client, session_handle))
1051 }
1052
1053 #[moire::instrument]
1054 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1055 self,
1056 handler: impl Handler<DriverReplySink> + 'static,
1057 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1058 where
1059 C: Conduit<Msg = MessageFamily> + 'static,
1060 C::Tx: MaybeSend + MaybeSync + 'static,
1061 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
1062 C::Rx: MaybeSend + 'static,
1063 {
1064 if let (Some(registry), Some(resume_key)) = (
1068 &self.session_registry,
1069 self.handshake_result.peer_resume_key,
1070 ) {
1071 if let Some(handle) = registry.get(&resume_key) {
1072 let (tx, rx) = self.conduit.split();
1073 if let Err(error) = handle
1074 .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1075 .await
1076 {
1077 registry.remove(&resume_key);
1078 return Err(error);
1079 }
1080 return Ok(SessionAcceptOutcome::Resumed);
1081 }
1082 }
1083
1084 let (client, session_handle) = self.establish(handler).await?;
1085 Ok(SessionAcceptOutcome::Established(client, session_handle))
1086 }
1087}
1088
1089pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1090 link: L,
1091 root_settings: ConnectionSettings,
1092 metadata: Metadata<'a>,
1093 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1094 keepalive: Option<SessionKeepaliveConfig>,
1095 resumable: bool,
1096 session_registry: Option<SessionRegistry>,
1097 operation_store: Option<Arc<dyn OperationStore>>,
1098 spawn_fn: SpawnFn,
1099}
1100
1101impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1102 fn new(link: L) -> Self {
1103 Self {
1104 link,
1105 root_settings: ConnectionSettings {
1106 parity: Parity::Even,
1107 max_concurrent_requests: 64,
1108 },
1109 metadata: vec![],
1110 on_connection: None,
1111 keepalive: None,
1112 resumable: true,
1113 session_registry: None,
1114 operation_store: None,
1115 spawn_fn: default_spawn_fn(),
1116 }
1117 }
1118
1119 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1120 self.root_settings = settings;
1121 self
1122 }
1123
1124 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1125 self.root_settings.max_concurrent_requests = max_concurrent_requests;
1126 self
1127 }
1128
1129 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1130 self.metadata = metadata;
1131 self
1132 }
1133
1134 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1135 self.on_connection = Some(Box::new(acceptor));
1136 self
1137 }
1138
1139 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1140 self.keepalive = Some(keepalive);
1141 self
1142 }
1143
1144 pub fn resumable(mut self) -> Self {
1145 self.resumable = true;
1146 self
1147 }
1148
1149 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1150 self.session_registry = Some(session_registry);
1151 self
1152 }
1153
1154 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1155 self.operation_store = Some(operation_store);
1156 self
1157 }
1158
1159 #[cfg(not(target_arch = "wasm32"))]
1160 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1161 self.spawn_fn = Box::new(f);
1162 self
1163 }
1164
1165 #[cfg(target_arch = "wasm32")]
1166 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1167 self.spawn_fn = Box::new(f);
1168 self
1169 }
1170
1171 #[moire::instrument]
1172 #[cfg(not(target_arch = "wasm32"))]
1173 pub async fn establish<Client: From<DriverCaller>>(
1174 self,
1175 handler: impl Handler<DriverReplySink> + 'static,
1176 ) -> Result<(Client, SessionHandle), SessionError>
1177 where
1178 L: Link + Send + 'static,
1179 L::Tx: MaybeSend + MaybeSync + 'static,
1180 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1181 L::Rx: MaybeSend + 'static,
1182 {
1183 let Self {
1184 link,
1185 root_settings,
1186 metadata,
1187 on_connection,
1188 keepalive,
1189 resumable,
1190 session_registry,
1191 operation_store,
1192 spawn_fn,
1193 } = self;
1194 let (mode, mut link) = accept_transport(link)
1195 .await
1196 .map_err(session_error_from_transport)?;
1197 match mode {
1198 TransportMode::Bare => {
1199 let handshake_result = handshake_as_acceptor(
1200 &link.tx,
1201 &mut link.rx,
1202 root_settings.clone(),
1203 true,
1204 resumable,
1205 None,
1206 )
1207 .await
1208 .map_err(session_error_from_handshake)?;
1209 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1210 .map_err(SessionError::Protocol)?;
1211 let builder = SessionAcceptorBuilder::new(
1212 BareConduit::with_message_plan(link, message_plan),
1213 handshake_result,
1214 );
1215 Self::apply_common_parts(
1216 builder,
1217 root_settings,
1218 metadata,
1219 on_connection,
1220 keepalive,
1221 resumable,
1222 session_registry,
1223 operation_store,
1224 spawn_fn,
1225 )
1226 .establish(handler)
1227 .await
1228 }
1229 TransportMode::Stable => {
1230 Self::finish_with_stable_parts(
1231 link,
1232 root_settings,
1233 metadata,
1234 on_connection,
1235 keepalive,
1236 resumable,
1237 session_registry,
1238 operation_store,
1239 spawn_fn,
1240 handler,
1241 )
1242 .await
1243 }
1244 }
1245 }
1246
1247 #[moire::instrument]
1248 #[cfg(not(target_arch = "wasm32"))]
1249 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1250 self,
1251 handler: impl Handler<DriverReplySink> + 'static,
1252 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1253 where
1254 L: Link + Send + 'static,
1255 L::Tx: MaybeSend + MaybeSync + 'static,
1256 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1257 L::Rx: MaybeSend + 'static,
1258 {
1259 let Self {
1260 link,
1261 root_settings,
1262 metadata,
1263 on_connection,
1264 keepalive,
1265 resumable,
1266 session_registry,
1267 operation_store,
1268 spawn_fn,
1269 } = self;
1270 let (mode, mut link) = accept_transport(link)
1271 .await
1272 .map_err(session_error_from_transport)?;
1273 match mode {
1274 TransportMode::Bare => {
1275 let handshake_result = handshake_as_acceptor(
1276 &link.tx,
1277 &mut link.rx,
1278 root_settings.clone(),
1279 true,
1280 resumable,
1281 None,
1282 )
1283 .await
1284 .map_err(session_error_from_handshake)?;
1285 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1286 .map_err(SessionError::Protocol)?;
1287 let builder = SessionAcceptorBuilder::new(
1288 BareConduit::with_message_plan(link, message_plan),
1289 handshake_result,
1290 );
1291 Self::apply_common_parts(
1292 builder,
1293 root_settings,
1294 metadata,
1295 on_connection,
1296 keepalive,
1297 resumable,
1298 session_registry,
1299 operation_store,
1300 spawn_fn,
1301 )
1302 .establish_or_resume(handler)
1303 .await
1304 }
1305 TransportMode::Stable => Self::finish_with_stable_parts(
1306 link,
1307 root_settings,
1308 metadata,
1309 on_connection,
1310 keepalive,
1311 resumable,
1312 session_registry,
1313 operation_store,
1314 spawn_fn,
1315 handler,
1316 )
1317 .await
1318 .map(|(client, handle)| SessionAcceptOutcome::Established(client, handle)),
1319 }
1320 }
1321
1322 #[cfg(target_arch = "wasm32")]
1323 pub async fn establish<Client: From<DriverCaller>>(
1324 self,
1325 handler: impl Handler<DriverReplySink> + 'static,
1326 ) -> Result<(Client, SessionHandle), SessionError>
1327 where
1328 L: Link + 'static,
1329 L::Tx: MaybeSend + MaybeSync + 'static,
1330 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1331 L::Rx: MaybeSend + 'static,
1332 {
1333 let Self {
1334 link,
1335 root_settings,
1336 metadata,
1337 on_connection,
1338 keepalive,
1339 resumable,
1340 session_registry,
1341 operation_store,
1342 spawn_fn,
1343 } = self;
1344 let (mode, mut link) = accept_transport(link)
1345 .await
1346 .map_err(session_error_from_transport)?;
1347 match mode {
1348 TransportMode::Bare => {
1349 let handshake_result = handshake_as_acceptor(
1350 &link.tx,
1351 &mut link.rx,
1352 root_settings.clone(),
1353 true,
1354 resumable,
1355 None,
1356 )
1357 .await
1358 .map_err(session_error_from_handshake)?;
1359 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1360 .map_err(SessionError::Protocol)?;
1361 let builder = SessionAcceptorBuilder::new(
1362 BareConduit::with_message_plan(link, message_plan),
1363 handshake_result,
1364 );
1365 Self::apply_common_parts(
1366 builder,
1367 root_settings,
1368 metadata,
1369 on_connection,
1370 keepalive,
1371 resumable,
1372 session_registry,
1373 operation_store,
1374 spawn_fn,
1375 )
1376 .establish(handler)
1377 .await
1378 }
1379 TransportMode::Stable => Err(SessionError::Protocol(
1380 "stable conduit transport selection is unsupported on wasm".into(),
1381 )),
1382 }
1383 }
1384
1385 #[cfg(target_arch = "wasm32")]
1386 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1387 self,
1388 handler: impl Handler<DriverReplySink> + 'static,
1389 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1390 where
1391 L: Link + 'static,
1392 L::Tx: MaybeSend + MaybeSync + 'static,
1393 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1394 L::Rx: MaybeSend + 'static,
1395 {
1396 let Self {
1397 link,
1398 root_settings,
1399 metadata,
1400 on_connection,
1401 keepalive,
1402 resumable,
1403 session_registry,
1404 operation_store,
1405 spawn_fn,
1406 } = self;
1407 let (mode, mut link) = accept_transport(link)
1408 .await
1409 .map_err(session_error_from_transport)?;
1410 match mode {
1411 TransportMode::Bare => {
1412 let handshake_result = handshake_as_acceptor(
1413 &link.tx,
1414 &mut link.rx,
1415 root_settings.clone(),
1416 true,
1417 resumable,
1418 None,
1419 )
1420 .await
1421 .map_err(session_error_from_handshake)?;
1422 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1423 .map_err(SessionError::Protocol)?;
1424 let builder = SessionAcceptorBuilder::new(
1425 BareConduit::with_message_plan(link, message_plan),
1426 handshake_result,
1427 );
1428 Self::apply_common_parts(
1429 builder,
1430 root_settings,
1431 metadata,
1432 on_connection,
1433 keepalive,
1434 resumable,
1435 session_registry,
1436 operation_store,
1437 spawn_fn,
1438 )
1439 .establish_or_resume(handler)
1440 .await
1441 }
1442 TransportMode::Stable => Err(SessionError::Protocol(
1443 "stable conduit transport selection is unsupported on wasm".into(),
1444 )),
1445 }
1446 }
1447
1448 #[cfg(not(target_arch = "wasm32"))]
1449 #[allow(clippy::too_many_arguments)]
1450 async fn finish_with_stable_parts<Client: From<DriverCaller>>(
1451 mut link: SplitLink<L::Tx, L::Rx>,
1452 root_settings: ConnectionSettings,
1453 metadata: Metadata<'a>,
1454 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1455 keepalive: Option<SessionKeepaliveConfig>,
1456 resumable: bool,
1457 session_registry: Option<SessionRegistry>,
1458 operation_store: Option<Arc<dyn OperationStore>>,
1459 spawn_fn: SpawnFn,
1460 handler: impl Handler<DriverReplySink> + 'static,
1461 ) -> Result<(Client, SessionHandle), SessionError>
1462 where
1463 L: Link + Send + 'static,
1464 L::Tx: MaybeSend + MaybeSync + Send + 'static,
1465 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1466 L::Rx: MaybeSend + Send + 'static,
1467 {
1468 let handshake_result = handshake_as_acceptor(
1469 &link.tx,
1470 &mut link.rx,
1471 root_settings.clone(),
1472 true,
1473 resumable,
1474 None,
1475 )
1476 .await
1477 .map_err(session_error_from_handshake)?;
1478 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1479 .map_err(SessionError::Protocol)?;
1480 let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1483 .await
1484 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1485 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1486 link.tx,
1487 link.rx,
1488 Some(client_hello),
1489 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1490 )
1491 .await
1492 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1493 .with_message_plan(message_plan);
1494 let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1495 Self::apply_common_parts(
1496 builder,
1497 root_settings,
1498 metadata,
1499 on_connection,
1500 keepalive,
1501 resumable,
1502 session_registry,
1503 operation_store,
1504 spawn_fn,
1505 )
1506 .establish(handler)
1507 .await
1508 }
1509
1510 #[allow(clippy::too_many_arguments)]
1511 #[allow(clippy::too_many_arguments)]
1512 fn apply_common_parts<C>(
1513 mut builder: SessionAcceptorBuilder<'a, C>,
1514 root_settings: ConnectionSettings,
1515 metadata: Metadata<'a>,
1516 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1517 keepalive: Option<SessionKeepaliveConfig>,
1518 resumable: bool,
1519 session_registry: Option<SessionRegistry>,
1520 operation_store: Option<Arc<dyn OperationStore>>,
1521 spawn_fn: SpawnFn,
1522 ) -> SessionAcceptorBuilder<'a, C> {
1523 builder.root_settings = root_settings;
1524 builder.metadata = metadata;
1525 builder.on_connection = on_connection;
1526 builder.keepalive = keepalive;
1527 builder.resumable = resumable;
1528 builder.session_registry = session_registry;
1529 builder.operation_store = operation_store;
1530 builder.spawn_fn = spawn_fn;
1531 builder
1532 }
1533}
1534
1535fn validate_negotiated_root_settings(
1536 expected_root_settings: &ConnectionSettings,
1537 handshake_result: &HandshakeResult,
1538) -> Result<(), SessionError> {
1539 if handshake_result.our_settings != *expected_root_settings {
1540 return Err(SessionError::Protocol(
1541 "negotiated root settings do not match builder settings".into(),
1542 ));
1543 }
1544 Ok(())
1545}
1546
1547fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1548 match error {
1549 crate::HandshakeError::Io(io) => SessionError::Io(io),
1550 crate::HandshakeError::PeerClosed => {
1551 SessionError::Protocol("peer closed during handshake".into())
1552 }
1553 crate::HandshakeError::NotResumable => SessionError::NotResumable,
1554 other => SessionError::Protocol(other.to_string()),
1555 }
1556}
1557
1558fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1559 match error {
1560 crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1561 crate::TransportPrologueError::LinkDead => {
1562 SessionError::Protocol("link closed during transport prologue".into())
1563 }
1564 crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1565 crate::TransportPrologueError::Rejected(reason) => {
1566 SessionError::Protocol(format!("transport rejected: {reason}"))
1567 }
1568 }
1569}