1use std::{
2 collections::HashMap,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6};
7
8use moire::sync::mpsc;
9use vox_types::{
10 Conduit, ConduitTx, ConnectionSettings, Handler, HandshakeResult, Link, MaybeSend, MaybeSync,
11 MessageFamily, Metadata, Parity, SessionResumeKey, SplitLink,
12};
13
14#[cfg(not(target_arch = "wasm32"))]
15use crate::{Attachment, LinkSource, StableConduit};
16use crate::{
17 BareConduit, IntoConduit, OperationStore, TransportMode, accept_transport,
18 handshake_as_acceptor, handshake_as_initiator, initiate_transport,
19};
20
21use super::{
22 CloseRequest, ConduitRecoverer, ConnectionAcceptor, OpenRequest, Session, SessionError,
23 SessionHandle, SessionKeepaliveConfig,
24};
25use crate::{Driver, DriverCaller, DriverReplySink};
26
27#[cfg(not(target_arch = "wasm32"))]
30pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
31#[cfg(target_arch = "wasm32")]
32pub type BoxSessionFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
33
34#[cfg(not(target_arch = "wasm32"))]
35type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + Send + 'static>;
36#[cfg(target_arch = "wasm32")]
37type SpawnFn = Box<dyn FnOnce(BoxSessionFuture) + 'static>;
38
39#[cfg(not(target_arch = "wasm32"))]
40fn default_spawn_fn() -> SpawnFn {
41 Box::new(|fut| {
42 tokio::spawn(fut);
43 })
44}
45
46#[cfg(target_arch = "wasm32")]
47fn default_spawn_fn() -> SpawnFn {
48 Box::new(|fut| {
49 wasm_bindgen_futures::spawn_local(fut);
50 })
51}
52
53pub fn initiator_conduit<I: IntoConduit>(
56 into_conduit: I,
57 handshake_result: HandshakeResult,
58) -> SessionInitiatorBuilder<'static, I::Conduit> {
59 SessionInitiatorBuilder::new(into_conduit.into_conduit(), handshake_result)
60}
61
62#[cfg(not(target_arch = "wasm32"))]
63pub fn initiator<S>(source: S, mode: TransportMode) -> SessionSourceInitiatorBuilder<'static, S>
64where
65 S: LinkSource,
66{
67 SessionSourceInitiatorBuilder::new(source, mode)
68}
69
70pub fn acceptor_conduit<I: IntoConduit>(
71 into_conduit: I,
72 handshake_result: HandshakeResult,
73) -> SessionAcceptorBuilder<'static, I::Conduit> {
74 SessionAcceptorBuilder::new(into_conduit.into_conduit(), handshake_result)
75}
76
77pub async fn initiator_on_link<L: Link>(
80 link: L,
81 settings: ConnectionSettings,
82) -> Result<
83 SessionInitiatorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
84 SessionError,
85>
86where
87 L::Tx: MaybeSend + MaybeSync + 'static,
88 L::Rx: MaybeSend + 'static,
89{
90 let (tx, mut rx) = link.split();
91 let handshake_result = handshake_as_initiator(&tx, &mut rx, settings, true, None)
92 .await
93 .map_err(session_error_from_handshake)?;
94 let message_plan =
95 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
96 Ok(SessionInitiatorBuilder::new(
97 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
98 handshake_result,
99 ))
100}
101
102pub async fn acceptor_on_link<L: Link>(
105 link: L,
106 settings: ConnectionSettings,
107) -> Result<
108 SessionAcceptorBuilder<'static, BareConduit<MessageFamily, SplitLink<L::Tx, L::Rx>>>,
109 SessionError,
110>
111where
112 L::Tx: MaybeSend + MaybeSync + 'static,
113 L::Rx: MaybeSend + 'static,
114{
115 let (tx, mut rx) = link.split();
116 let handshake_result = handshake_as_acceptor(&tx, &mut rx, settings, true, false, None)
117 .await
118 .map_err(session_error_from_handshake)?;
119 let message_plan =
120 crate::MessagePlan::from_handshake(&handshake_result).map_err(SessionError::Protocol)?;
121 Ok(SessionAcceptorBuilder::new(
122 BareConduit::with_message_plan(SplitLink { tx, rx }, message_plan),
123 handshake_result,
124 ))
125}
126
127pub fn initiator_on<L: Link>(
128 link: L,
129 mode: TransportMode,
130) -> SessionTransportInitiatorBuilder<'static, L> {
131 SessionTransportInitiatorBuilder::new(link, mode)
132}
133
134pub fn initiator_transport<L: Link>(
135 link: L,
136 mode: TransportMode,
137) -> SessionTransportInitiatorBuilder<'static, L> {
138 initiator_on(link, mode)
139}
140
141pub fn acceptor_on<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
142 SessionTransportAcceptorBuilder::new(link)
143}
144
145pub fn acceptor_transport<L: Link>(link: L) -> SessionTransportAcceptorBuilder<'static, L> {
146 acceptor_on(link)
147}
148
149#[derive(Clone, Default)]
150pub struct SessionRegistry {
151 inner: Arc<Mutex<HashMap<SessionResumeKey, SessionHandle>>>,
152}
153
154impl SessionRegistry {
155 fn get(&self, key: &SessionResumeKey) -> Option<SessionHandle> {
156 self.inner
157 .lock()
158 .expect("session registry poisoned")
159 .get(key)
160 .cloned()
161 }
162
163 fn insert(&self, key: SessionResumeKey, handle: SessionHandle) {
164 self.inner
165 .lock()
166 .expect("session registry poisoned")
167 .insert(key, handle);
168 }
169
170 fn remove(&self, key: &SessionResumeKey) {
171 self.inner
172 .lock()
173 .expect("session registry poisoned")
174 .remove(key);
175 }
176}
177
178pub enum SessionAcceptOutcome<Client> {
179 Established(Client, SessionHandle),
180 Resumed,
181}
182
183pub struct SessionInitiatorBuilder<'a, C> {
184 conduit: C,
185 handshake_result: HandshakeResult,
186 root_settings: ConnectionSettings,
187 metadata: Metadata<'a>,
188 on_connection: Option<Box<dyn ConnectionAcceptor>>,
189 keepalive: Option<SessionKeepaliveConfig>,
190 resumable: bool,
191 recoverer: Option<Box<dyn ConduitRecoverer>>,
192 operation_store: Option<Arc<dyn OperationStore>>,
193 spawn_fn: SpawnFn,
194}
195
196impl<'a, C> SessionInitiatorBuilder<'a, C> {
197 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
198 let root_settings = handshake_result.our_settings.clone();
199 Self {
200 conduit,
201 handshake_result,
202 root_settings,
203 metadata: vec![],
204 on_connection: None,
205 keepalive: None,
206 resumable: false,
207 recoverer: None,
208 operation_store: None,
209 spawn_fn: default_spawn_fn(),
210 }
211 }
212
213 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
214 self.on_connection = Some(Box::new(acceptor));
215 self
216 }
217
218 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
219 self.keepalive = Some(keepalive);
220 self
221 }
222
223 pub fn resumable(mut self) -> Self {
224 self.resumable = true;
225 self
226 }
227
228 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
229 self.operation_store = Some(operation_store);
230 self
231 }
232
233 #[cfg(not(target_arch = "wasm32"))]
236 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
237 self.spawn_fn = Box::new(f);
238 self
239 }
240
241 #[cfg(target_arch = "wasm32")]
244 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
245 self.spawn_fn = Box::new(f);
246 self
247 }
248
249 pub async fn establish<Client: From<DriverCaller>>(
250 self,
251 handler: impl Handler<DriverReplySink> + 'static,
252 ) -> Result<(Client, SessionHandle), SessionError>
253 where
254 C: Conduit<Msg = MessageFamily> + 'static,
255 C::Tx: MaybeSend + MaybeSync + 'static,
256 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
257 C::Rx: MaybeSend + 'static,
258 {
259 let Self {
260 conduit,
261 handshake_result,
262 root_settings,
263 metadata: _metadata,
264 on_connection,
265 keepalive,
266 resumable,
267 recoverer,
268 operation_store,
269 spawn_fn,
270 } = self;
271 validate_negotiated_root_settings(&root_settings, &handshake_result)?;
272 let (tx, rx) = conduit.split();
273 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
274 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
275 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
276 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
277 let mut session = Session::pre_handshake(
278 tx,
279 rx,
280 on_connection,
281 open_rx,
282 close_rx,
283 resume_rx,
284 control_tx.clone(),
285 control_rx,
286 keepalive,
287 resumable,
288 recoverer,
289 );
290 let handle = session.establish_from_handshake(handshake_result)?;
291 let resume_key = session.resume_key();
292 let session_handle = SessionHandle {
293 open_tx,
294 close_tx,
295 resume_tx,
296 control_tx,
297 resume_key,
298 };
299 let mut driver = match operation_store {
300 Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
301 None => Driver::new(handle, handler),
302 };
303 let client = Client::from(driver.caller());
304 (spawn_fn)(Box::pin(async move { session.run().await }));
305 #[cfg(not(target_arch = "wasm32"))]
306 tokio::spawn(async move { driver.run().await });
307 #[cfg(target_arch = "wasm32")]
308 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
309 Ok((client, session_handle))
310 }
311}
312
313#[cfg(not(target_arch = "wasm32"))]
314pub struct SessionSourceInitiatorBuilder<'a, S> {
315 source: S,
316 mode: TransportMode,
317 root_settings: ConnectionSettings,
318 metadata: Metadata<'a>,
319 on_connection: Option<Box<dyn ConnectionAcceptor>>,
320 keepalive: Option<SessionKeepaliveConfig>,
321 resumable: bool,
322 operation_store: Option<Arc<dyn OperationStore>>,
323 spawn_fn: SpawnFn,
324}
325
326#[cfg(not(target_arch = "wasm32"))]
327impl<'a, S> SessionSourceInitiatorBuilder<'a, S> {
328 fn new(source: S, mode: TransportMode) -> Self {
329 Self {
330 source,
331 mode,
332 root_settings: ConnectionSettings {
333 parity: Parity::Odd,
334 max_concurrent_requests: 64,
335 },
336 metadata: vec![],
337 on_connection: None,
338 keepalive: None,
339 resumable: true,
340 operation_store: None,
341 spawn_fn: default_spawn_fn(),
342 }
343 }
344
345 pub fn parity(mut self, parity: Parity) -> Self {
346 self.root_settings.parity = parity;
347 self
348 }
349
350 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
351 self.root_settings = settings;
352 self
353 }
354
355 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
356 self.root_settings.max_concurrent_requests = max_concurrent_requests;
357 self
358 }
359
360 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
361 self.metadata = metadata;
362 self
363 }
364
365 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
366 self.on_connection = Some(Box::new(acceptor));
367 self
368 }
369
370 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
371 self.keepalive = Some(keepalive);
372 self
373 }
374
375 pub fn resumable(mut self) -> Self {
376 self.resumable = true;
377 self
378 }
379
380 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
381 self.operation_store = Some(operation_store);
382 self
383 }
384
385 #[cfg(not(target_arch = "wasm32"))]
386 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
387 self.spawn_fn = Box::new(f);
388 self
389 }
390
391 #[cfg(target_arch = "wasm32")]
392 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
393 self.spawn_fn = Box::new(f);
394 self
395 }
396
397 pub async fn establish<Client: From<DriverCaller>>(
398 self,
399 handler: impl Handler<DriverReplySink> + 'static,
400 ) -> Result<(Client, SessionHandle), SessionError>
401 where
402 S: LinkSource,
403 S::Link: Link + Send + 'static,
404 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
405 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
406 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
407 {
408 let Self {
409 mut source,
410 mode,
411 root_settings,
412 metadata,
413 on_connection,
414 keepalive,
415 resumable,
416 operation_store,
417 spawn_fn,
418 } = self;
419
420 match mode {
421 TransportMode::Bare => {
422 let attachment = source.next_link().await.map_err(SessionError::Io)?;
423 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
424 .await
425 .map_err(session_error_from_transport)?;
426 let handshake_result = handshake_as_initiator(
427 &link.tx,
428 &mut link.rx,
429 root_settings.clone(),
430 true,
431 None,
432 )
433 .await
434 .map_err(session_error_from_handshake)?;
435 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
436 .map_err(SessionError::Protocol)?;
437 let builder = SessionInitiatorBuilder::new(
438 BareConduit::with_message_plan(link, message_plan),
439 handshake_result,
440 );
441 let recoverer = Box::new(BareSourceRecoverer {
442 source,
443 settings: root_settings.clone(),
444 });
445 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
446 builder,
447 root_settings,
448 metadata,
449 on_connection,
450 keepalive,
451 resumable,
452 Some(recoverer),
453 operation_store,
454 spawn_fn,
455 )
456 .establish(handler)
457 .await
458 }
459 TransportMode::Stable => {
460 let attachment = source.next_link().await.map_err(SessionError::Io)?;
463 let mut link = initiate_transport(attachment.into_link(), TransportMode::Stable)
464 .await
465 .map_err(session_error_from_transport)?;
466 let handshake_result = handshake_as_initiator(
467 &link.tx,
468 &mut link.rx,
469 root_settings.clone(),
470 true,
471 None,
472 )
473 .await
474 .map_err(session_error_from_handshake)?;
475 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
476 .map_err(SessionError::Protocol)?;
477 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
478 link.tx,
479 link.rx,
480 None, TransportedLinkSource {
482 source,
483 mode: TransportMode::Stable,
484 },
485 )
486 .await
487 .map_err(|error| {
488 SessionError::Protocol(format!("stable conduit setup failed: {error}"))
489 })?
490 .with_message_plan(message_plan);
491 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
492 SessionTransportInitiatorBuilder::<S::Link>::apply_common_parts(
493 builder,
494 root_settings,
495 metadata,
496 on_connection,
497 keepalive,
498 resumable,
499 None,
500 operation_store,
501 spawn_fn,
502 )
503 .establish(handler)
504 .await
505 }
506 }
507 }
508}
509
510pub struct SessionTransportInitiatorBuilder<'a, L> {
511 link: L,
512 mode: TransportMode,
513 root_settings: ConnectionSettings,
514 metadata: Metadata<'a>,
515 on_connection: Option<Box<dyn ConnectionAcceptor>>,
516 keepalive: Option<SessionKeepaliveConfig>,
517 resumable: bool,
518 operation_store: Option<Arc<dyn OperationStore>>,
519 spawn_fn: SpawnFn,
520}
521
522impl<'a, L> SessionTransportInitiatorBuilder<'a, L> {
523 fn new(link: L, mode: TransportMode) -> Self {
524 Self {
525 link,
526 mode,
527 root_settings: ConnectionSettings {
528 parity: Parity::Odd,
529 max_concurrent_requests: 64,
530 },
531 metadata: vec![],
532 on_connection: None,
533 keepalive: None,
534 resumable: false,
535 operation_store: None,
536 spawn_fn: default_spawn_fn(),
537 }
538 }
539
540 pub fn parity(mut self, parity: Parity) -> Self {
541 self.root_settings.parity = parity;
542 self
543 }
544
545 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
546 self.root_settings = settings;
547 self
548 }
549
550 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
551 self.root_settings.max_concurrent_requests = max_concurrent_requests;
552 self
553 }
554
555 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
556 self.metadata = metadata;
557 self
558 }
559
560 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
561 self.on_connection = Some(Box::new(acceptor));
562 self
563 }
564
565 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
566 self.keepalive = Some(keepalive);
567 self
568 }
569
570 pub fn resumable(mut self) -> Self {
571 self.resumable = true;
572 self
573 }
574
575 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
576 self.operation_store = Some(operation_store);
577 self
578 }
579
580 #[cfg(not(target_arch = "wasm32"))]
581 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
582 self.spawn_fn = Box::new(f);
583 self
584 }
585
586 #[cfg(target_arch = "wasm32")]
587 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
588 self.spawn_fn = Box::new(f);
589 self
590 }
591
592 #[cfg(not(target_arch = "wasm32"))]
593 pub async fn establish<Client: From<DriverCaller>>(
594 self,
595 handler: impl Handler<DriverReplySink> + 'static,
596 ) -> Result<(Client, SessionHandle), SessionError>
597 where
598 L: Link + Send + 'static,
599 L::Tx: MaybeSend + MaybeSync + 'static,
600 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
601 L::Rx: MaybeSend + 'static,
602 {
603 let Self {
604 link,
605 mode,
606 root_settings,
607 metadata,
608 on_connection,
609 keepalive,
610 resumable,
611 operation_store,
612 spawn_fn,
613 } = self;
614 match mode {
615 TransportMode::Bare => {
616 let link = initiate_transport(link, TransportMode::Bare)
617 .await
618 .map_err(session_error_from_transport)?;
619 Self::finish_with_bare_parts(
620 link,
621 root_settings,
622 metadata,
623 on_connection,
624 keepalive,
625 resumable,
626 operation_store,
627 spawn_fn,
628 handler,
629 )
630 .await
631 }
632 TransportMode::Stable => {
633 let link = initiate_transport(link, TransportMode::Stable)
634 .await
635 .map_err(session_error_from_transport)?;
636 Self::finish_with_stable_parts(
637 link,
638 root_settings,
639 metadata,
640 on_connection,
641 keepalive,
642 resumable,
643 operation_store,
644 spawn_fn,
645 handler,
646 )
647 .await
648 }
649 }
650 }
651
652 #[cfg(target_arch = "wasm32")]
653 pub async fn establish<Client: From<DriverCaller>>(
654 self,
655 handler: impl Handler<DriverReplySink> + 'static,
656 ) -> Result<(Client, SessionHandle), SessionError>
657 where
658 L: Link + 'static,
659 L::Tx: MaybeSend + MaybeSync + 'static,
660 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
661 L::Rx: MaybeSend + 'static,
662 {
663 let Self {
664 link,
665 mode,
666 root_settings,
667 metadata,
668 on_connection,
669 keepalive,
670 resumable,
671 operation_store,
672 spawn_fn,
673 } = self;
674 match mode {
675 TransportMode::Bare => {
676 let link = initiate_transport(link, TransportMode::Bare)
677 .await
678 .map_err(session_error_from_transport)?;
679 Self::finish_with_bare_parts(
680 link,
681 root_settings,
682 metadata,
683 on_connection,
684 keepalive,
685 resumable,
686 operation_store,
687 spawn_fn,
688 handler,
689 )
690 .await
691 }
692 TransportMode::Stable => Err(SessionError::Protocol(
693 "stable conduit transport selection is unsupported on wasm".into(),
694 )),
695 }
696 }
697
698 #[allow(clippy::too_many_arguments)]
699 async fn finish_with_bare_parts<Client: From<DriverCaller>>(
700 mut link: SplitLink<L::Tx, L::Rx>,
701 root_settings: ConnectionSettings,
702 metadata: Metadata<'a>,
703 on_connection: Option<Box<dyn ConnectionAcceptor>>,
704 keepalive: Option<SessionKeepaliveConfig>,
705 resumable: bool,
706 operation_store: Option<Arc<dyn OperationStore>>,
707 spawn_fn: SpawnFn,
708 handler: impl Handler<DriverReplySink> + 'static,
709 ) -> Result<(Client, SessionHandle), SessionError>
710 where
711 L: Link + 'static,
712 L::Tx: MaybeSend + MaybeSync + 'static,
713 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
714 L::Rx: MaybeSend + 'static,
715 {
716 let handshake_result =
717 handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
718 .await
719 .map_err(session_error_from_handshake)?;
720 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
721 .map_err(SessionError::Protocol)?;
722 let builder = SessionInitiatorBuilder::new(
723 BareConduit::with_message_plan(link, message_plan),
724 handshake_result,
725 );
726 Self::apply_common_parts(
727 builder,
728 root_settings,
729 metadata,
730 on_connection,
731 keepalive,
732 resumable,
733 None,
734 operation_store,
735 spawn_fn,
736 )
737 .establish(handler)
738 .await
739 }
740
741 #[cfg(not(target_arch = "wasm32"))]
742 #[allow(clippy::too_many_arguments)]
743 async fn finish_with_stable_parts<Client: From<DriverCaller>>(
744 mut link: SplitLink<L::Tx, L::Rx>,
745 root_settings: ConnectionSettings,
746 metadata: Metadata<'a>,
747 on_connection: Option<Box<dyn ConnectionAcceptor>>,
748 keepalive: Option<SessionKeepaliveConfig>,
749 resumable: bool,
750 operation_store: Option<Arc<dyn OperationStore>>,
751 spawn_fn: SpawnFn,
752 handler: impl Handler<DriverReplySink> + 'static,
753 ) -> Result<(Client, SessionHandle), SessionError>
754 where
755 L: Link + Send + 'static,
756 L::Tx: MaybeSend + MaybeSync + Send + 'static,
757 for<'p> <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
758 L::Rx: MaybeSend + Send + 'static,
759 {
760 let handshake_result =
761 handshake_as_initiator(&link.tx, &mut link.rx, root_settings.clone(), true, None)
762 .await
763 .map_err(session_error_from_handshake)?;
764 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
765 .map_err(SessionError::Protocol)?;
766 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
767 link.tx,
768 link.rx,
769 None,
770 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
771 )
772 .await
773 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
774 .with_message_plan(message_plan);
775 let builder = SessionInitiatorBuilder::new(conduit, handshake_result);
776 Self::apply_common_parts(
777 builder,
778 root_settings,
779 metadata,
780 on_connection,
781 keepalive,
782 resumable,
783 None,
784 operation_store,
785 spawn_fn,
786 )
787 .establish(handler)
788 .await
789 }
790
791 #[allow(clippy::too_many_arguments)]
792 #[allow(clippy::too_many_arguments)]
793 fn apply_common_parts<C>(
794 mut builder: SessionInitiatorBuilder<'a, C>,
795 root_settings: ConnectionSettings,
796 metadata: Metadata<'a>,
797 on_connection: Option<Box<dyn ConnectionAcceptor>>,
798 keepalive: Option<SessionKeepaliveConfig>,
799 resumable: bool,
800 recoverer: Option<Box<dyn ConduitRecoverer>>,
801 operation_store: Option<Arc<dyn OperationStore>>,
802 spawn_fn: SpawnFn,
803 ) -> SessionInitiatorBuilder<'a, C> {
804 builder.root_settings = root_settings;
805 builder.metadata = metadata;
806 builder.on_connection = on_connection;
807 builder.keepalive = keepalive;
808 builder.resumable = resumable;
809 builder.recoverer = recoverer;
810 builder.operation_store = operation_store;
811 builder.spawn_fn = spawn_fn;
812 builder
813 }
814}
815
816#[cfg(not(target_arch = "wasm32"))]
817struct BareSourceRecoverer<S> {
818 source: S,
819 settings: ConnectionSettings,
820}
821
822#[cfg(not(target_arch = "wasm32"))]
823impl<S> ConduitRecoverer for BareSourceRecoverer<S>
824where
825 S: LinkSource,
826 S::Link: Link + Send + 'static,
827 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
828 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
829 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
830{
831 fn next_conduit<'a>(
832 &'a mut self,
833 resume_key: Option<&'a SessionResumeKey>,
834 ) -> vox_types::BoxFut<'a, Result<super::RecoveredConduit, SessionError>> {
835 Box::pin(async move {
836 let attachment = self.source.next_link().await.map_err(SessionError::Io)?;
837 let mut link = initiate_transport(attachment.into_link(), TransportMode::Bare)
838 .await
839 .map_err(session_error_from_transport)?;
840 let handshake_result = handshake_as_initiator(
841 &link.tx,
842 &mut link.rx,
843 self.settings.clone(),
844 true,
845 resume_key,
846 )
847 .await
848 .map_err(session_error_from_handshake)?;
849 let conduit = BareConduit::<MessageFamily, _>::new(link);
850 let (tx, rx) = conduit.split();
851 Ok(super::RecoveredConduit {
852 tx: Arc::new(tx) as Arc<dyn crate::DynConduitTx>,
853 rx: Box::new(rx) as Box<dyn crate::DynConduitRx>,
854 handshake: handshake_result,
855 })
856 })
857 }
858}
859
860#[cfg(not(target_arch = "wasm32"))]
861struct TransportedLinkSource<S> {
862 source: S,
863 mode: TransportMode,
864}
865
866#[cfg(not(target_arch = "wasm32"))]
867impl<S> LinkSource for TransportedLinkSource<S>
868where
869 S: LinkSource,
870 S::Link: Link + Send + 'static,
871 <S::Link as Link>::Tx: MaybeSend + MaybeSync + Send + 'static,
872 <<S::Link as Link>::Tx as vox_types::LinkTx>::Permit: MaybeSend,
873 <S::Link as Link>::Rx: MaybeSend + Send + 'static,
874{
875 type Link = SplitLink<<S::Link as Link>::Tx, <S::Link as Link>::Rx>;
876
877 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
878 let attachment = self.source.next_link().await?;
879 let link = initiate_transport(attachment.into_link(), self.mode)
880 .await
881 .map_err(std::io::Error::other)?;
882 Ok(Attachment::initiator(link))
883 }
884}
885
886pub struct SessionAcceptorBuilder<'a, C> {
887 conduit: C,
888 handshake_result: HandshakeResult,
889 root_settings: ConnectionSettings,
890 metadata: Metadata<'a>,
891 on_connection: Option<Box<dyn ConnectionAcceptor>>,
892 keepalive: Option<SessionKeepaliveConfig>,
893 resumable: bool,
894 session_registry: Option<SessionRegistry>,
895 operation_store: Option<Arc<dyn OperationStore>>,
896 spawn_fn: SpawnFn,
897}
898
899impl<'a, C> SessionAcceptorBuilder<'a, C> {
900 fn new(conduit: C, handshake_result: HandshakeResult) -> Self {
901 let root_settings = handshake_result.our_settings.clone();
902 Self {
903 conduit,
904 handshake_result,
905 root_settings,
906 metadata: vec![],
907 on_connection: None,
908 keepalive: None,
909 resumable: false,
910 session_registry: None,
911 operation_store: None,
912 spawn_fn: default_spawn_fn(),
913 }
914 }
915
916 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
917 self.on_connection = Some(Box::new(acceptor));
918 self
919 }
920
921 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
922 self.keepalive = Some(keepalive);
923 self
924 }
925
926 pub fn resumable(mut self) -> Self {
927 self.resumable = true;
928 self
929 }
930
931 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
932 self.session_registry = Some(session_registry);
933 self
934 }
935
936 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
937 self.operation_store = Some(operation_store);
938 self
939 }
940
941 #[cfg(not(target_arch = "wasm32"))]
944 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
945 self.spawn_fn = Box::new(f);
946 self
947 }
948
949 #[cfg(target_arch = "wasm32")]
952 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
953 self.spawn_fn = Box::new(f);
954 self
955 }
956
957 #[moire::instrument]
958 pub async fn establish<Client: From<DriverCaller>>(
959 self,
960 handler: impl Handler<DriverReplySink> + 'static,
961 ) -> Result<(Client, SessionHandle), SessionError>
962 where
963 C: Conduit<Msg = MessageFamily> + 'static,
964 C::Tx: MaybeSend + MaybeSync + 'static,
965 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
966 C::Rx: MaybeSend + 'static,
967 {
968 let Self {
969 conduit,
970 handshake_result,
971 root_settings,
972 metadata: _metadata,
973 on_connection,
974 keepalive,
975 resumable,
976 session_registry,
977 operation_store,
978 spawn_fn,
979 } = self;
980 validate_negotiated_root_settings(&root_settings, &handshake_result)?;
981 let (tx, rx) = conduit.split();
982 let (open_tx, open_rx) = mpsc::channel::<OpenRequest>("session.open", 4);
983 let (close_tx, close_rx) = mpsc::channel::<CloseRequest>("session.close", 4);
984 let (resume_tx, resume_rx) = mpsc::channel::<super::ResumeRequest>("session.resume", 1);
985 let (control_tx, control_rx) = mpsc::unbounded_channel("session.control");
986 let mut session = Session::pre_handshake(
987 tx,
988 rx,
989 on_connection,
990 open_rx,
991 close_rx,
992 resume_rx,
993 control_tx.clone(),
994 control_rx,
995 keepalive,
996 resumable,
997 None,
998 );
999 let handle = session.establish_from_handshake(handshake_result)?;
1000 let resume_key = session.resume_key();
1001 let session_handle = SessionHandle {
1002 open_tx,
1003 close_tx,
1004 resume_tx,
1005 control_tx,
1006 resume_key,
1007 };
1008 if let (Some(registry), Some(key)) = (&session_registry, resume_key) {
1009 registry.insert(key, session_handle.clone());
1010 }
1011 let mut driver = match operation_store {
1012 Some(operation_store) => Driver::with_operation_store(handle, handler, operation_store),
1013 None => Driver::new(handle, handler),
1014 };
1015 let client = Client::from(driver.caller());
1016 (spawn_fn)(Box::pin(async move { session.run().await }));
1017 #[cfg(not(target_arch = "wasm32"))]
1018 tokio::spawn(async move { driver.run().await });
1019 #[cfg(target_arch = "wasm32")]
1020 wasm_bindgen_futures::spawn_local(async move { driver.run().await });
1021 Ok((client, session_handle))
1022 }
1023
1024 #[moire::instrument]
1025 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1026 self,
1027 handler: impl Handler<DriverReplySink> + 'static,
1028 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1029 where
1030 C: Conduit<Msg = MessageFamily> + 'static,
1031 C::Tx: MaybeSend + MaybeSync + 'static,
1032 for<'p> <C::Tx as ConduitTx>::Permit<'p>: MaybeSend,
1033 C::Rx: MaybeSend + 'static,
1034 {
1035 if let (Some(registry), Some(resume_key)) = (
1039 &self.session_registry,
1040 self.handshake_result.peer_resume_key,
1041 ) {
1042 if let Some(handle) = registry.get(&resume_key) {
1043 let (tx, rx) = self.conduit.split();
1044 if let Err(error) = handle
1045 .resume_parts(Arc::new(tx), Box::new(rx), self.handshake_result)
1046 .await
1047 {
1048 registry.remove(&resume_key);
1049 return Err(error);
1050 }
1051 return Ok(SessionAcceptOutcome::Resumed);
1052 }
1053 return Err(SessionError::Protocol("unknown session resume key".into()));
1054 }
1055
1056 let (client, session_handle) = self.establish(handler).await?;
1057 Ok(SessionAcceptOutcome::Established(client, session_handle))
1058 }
1059}
1060
1061pub struct SessionTransportAcceptorBuilder<'a, L: Link> {
1062 link: L,
1063 root_settings: ConnectionSettings,
1064 metadata: Metadata<'a>,
1065 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1066 keepalive: Option<SessionKeepaliveConfig>,
1067 resumable: bool,
1068 session_registry: Option<SessionRegistry>,
1069 operation_store: Option<Arc<dyn OperationStore>>,
1070 spawn_fn: SpawnFn,
1071}
1072
1073impl<'a, L: Link> SessionTransportAcceptorBuilder<'a, L> {
1074 fn new(link: L) -> Self {
1075 Self {
1076 link,
1077 root_settings: ConnectionSettings {
1078 parity: Parity::Even,
1079 max_concurrent_requests: 64,
1080 },
1081 metadata: vec![],
1082 on_connection: None,
1083 keepalive: None,
1084 resumable: true,
1085 session_registry: None,
1086 operation_store: None,
1087 spawn_fn: default_spawn_fn(),
1088 }
1089 }
1090
1091 pub fn root_settings(mut self, settings: ConnectionSettings) -> Self {
1092 self.root_settings = settings;
1093 self
1094 }
1095
1096 pub fn max_concurrent_requests(mut self, max_concurrent_requests: u32) -> Self {
1097 self.root_settings.max_concurrent_requests = max_concurrent_requests;
1098 self
1099 }
1100
1101 pub fn metadata(mut self, metadata: Metadata<'a>) -> Self {
1102 self.metadata = metadata;
1103 self
1104 }
1105
1106 pub fn on_connection(mut self, acceptor: impl ConnectionAcceptor) -> Self {
1107 self.on_connection = Some(Box::new(acceptor));
1108 self
1109 }
1110
1111 pub fn keepalive(mut self, keepalive: SessionKeepaliveConfig) -> Self {
1112 self.keepalive = Some(keepalive);
1113 self
1114 }
1115
1116 pub fn resumable(mut self) -> Self {
1117 self.resumable = true;
1118 self
1119 }
1120
1121 pub fn session_registry(mut self, session_registry: SessionRegistry) -> Self {
1122 self.session_registry = Some(session_registry);
1123 self
1124 }
1125
1126 pub fn operation_store(mut self, operation_store: Arc<dyn OperationStore>) -> Self {
1127 self.operation_store = Some(operation_store);
1128 self
1129 }
1130
1131 #[cfg(not(target_arch = "wasm32"))]
1132 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + Send + 'static) -> Self {
1133 self.spawn_fn = Box::new(f);
1134 self
1135 }
1136
1137 #[cfg(target_arch = "wasm32")]
1138 pub fn spawn_fn(mut self, f: impl FnOnce(BoxSessionFuture) + 'static) -> Self {
1139 self.spawn_fn = Box::new(f);
1140 self
1141 }
1142
1143 #[moire::instrument]
1144 #[cfg(not(target_arch = "wasm32"))]
1145 pub async fn establish<Client: From<DriverCaller>>(
1146 self,
1147 handler: impl Handler<DriverReplySink> + 'static,
1148 ) -> Result<(Client, SessionHandle), SessionError>
1149 where
1150 L: Link + Send + 'static,
1151 L::Tx: MaybeSend + MaybeSync + 'static,
1152 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1153 L::Rx: MaybeSend + 'static,
1154 {
1155 let Self {
1156 link,
1157 root_settings,
1158 metadata,
1159 on_connection,
1160 keepalive,
1161 resumable,
1162 session_registry,
1163 operation_store,
1164 spawn_fn,
1165 } = self;
1166 let (mode, mut link) = accept_transport(link)
1167 .await
1168 .map_err(session_error_from_transport)?;
1169 match mode {
1170 TransportMode::Bare => {
1171 let handshake_result = handshake_as_acceptor(
1172 &link.tx,
1173 &mut link.rx,
1174 root_settings.clone(),
1175 true,
1176 resumable,
1177 None,
1178 )
1179 .await
1180 .map_err(session_error_from_handshake)?;
1181 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1182 .map_err(SessionError::Protocol)?;
1183 let builder = SessionAcceptorBuilder::new(
1184 BareConduit::with_message_plan(link, message_plan),
1185 handshake_result,
1186 );
1187 Self::apply_common_parts(
1188 builder,
1189 root_settings,
1190 metadata,
1191 on_connection,
1192 keepalive,
1193 resumable,
1194 session_registry,
1195 operation_store,
1196 spawn_fn,
1197 )
1198 .establish(handler)
1199 .await
1200 }
1201 TransportMode::Stable => {
1202 Self::finish_with_stable_parts(
1203 link,
1204 root_settings,
1205 metadata,
1206 on_connection,
1207 keepalive,
1208 resumable,
1209 session_registry,
1210 operation_store,
1211 spawn_fn,
1212 handler,
1213 )
1214 .await
1215 }
1216 }
1217 }
1218
1219 #[moire::instrument]
1220 #[cfg(not(target_arch = "wasm32"))]
1221 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1222 self,
1223 handler: impl Handler<DriverReplySink> + 'static,
1224 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1225 where
1226 L: Link + Send + 'static,
1227 L::Tx: MaybeSend + MaybeSync + 'static,
1228 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1229 L::Rx: MaybeSend + 'static,
1230 {
1231 let Self {
1232 link,
1233 root_settings,
1234 metadata,
1235 on_connection,
1236 keepalive,
1237 resumable,
1238 session_registry,
1239 operation_store,
1240 spawn_fn,
1241 } = self;
1242 let (mode, mut link) = accept_transport(link)
1243 .await
1244 .map_err(session_error_from_transport)?;
1245 match mode {
1246 TransportMode::Bare => {
1247 let handshake_result = handshake_as_acceptor(
1248 &link.tx,
1249 &mut link.rx,
1250 root_settings.clone(),
1251 true,
1252 resumable,
1253 None,
1254 )
1255 .await
1256 .map_err(session_error_from_handshake)?;
1257 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1258 .map_err(SessionError::Protocol)?;
1259 let builder = SessionAcceptorBuilder::new(
1260 BareConduit::with_message_plan(link, message_plan),
1261 handshake_result,
1262 );
1263 Self::apply_common_parts(
1264 builder,
1265 root_settings,
1266 metadata,
1267 on_connection,
1268 keepalive,
1269 resumable,
1270 session_registry,
1271 operation_store,
1272 spawn_fn,
1273 )
1274 .establish_or_resume(handler)
1275 .await
1276 }
1277 TransportMode::Stable => Self::finish_with_stable_parts(
1278 link,
1279 root_settings,
1280 metadata,
1281 on_connection,
1282 keepalive,
1283 resumable,
1284 session_registry,
1285 operation_store,
1286 spawn_fn,
1287 handler,
1288 )
1289 .await
1290 .map(|(client, handle)| SessionAcceptOutcome::Established(client, handle)),
1291 }
1292 }
1293
1294 #[cfg(target_arch = "wasm32")]
1295 pub async fn establish<Client: From<DriverCaller>>(
1296 self,
1297 handler: impl Handler<DriverReplySink> + 'static,
1298 ) -> Result<(Client, SessionHandle), SessionError>
1299 where
1300 L: Link + 'static,
1301 L::Tx: MaybeSend + MaybeSync + 'static,
1302 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1303 L::Rx: MaybeSend + 'static,
1304 {
1305 let Self {
1306 link,
1307 root_settings,
1308 metadata,
1309 on_connection,
1310 keepalive,
1311 resumable,
1312 session_registry,
1313 operation_store,
1314 spawn_fn,
1315 } = self;
1316 let (mode, mut link) = accept_transport(link)
1317 .await
1318 .map_err(session_error_from_transport)?;
1319 match mode {
1320 TransportMode::Bare => {
1321 let handshake_result = handshake_as_acceptor(
1322 &link.tx,
1323 &mut link.rx,
1324 root_settings.clone(),
1325 true,
1326 resumable,
1327 None,
1328 )
1329 .await
1330 .map_err(session_error_from_handshake)?;
1331 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1332 .map_err(SessionError::Protocol)?;
1333 let builder = SessionAcceptorBuilder::new(
1334 BareConduit::with_message_plan(link, message_plan),
1335 handshake_result,
1336 );
1337 Self::apply_common_parts(
1338 builder,
1339 root_settings,
1340 metadata,
1341 on_connection,
1342 keepalive,
1343 resumable,
1344 session_registry,
1345 operation_store,
1346 spawn_fn,
1347 )
1348 .establish(handler)
1349 .await
1350 }
1351 TransportMode::Stable => Err(SessionError::Protocol(
1352 "stable conduit transport selection is unsupported on wasm".into(),
1353 )),
1354 }
1355 }
1356
1357 #[cfg(target_arch = "wasm32")]
1358 pub async fn establish_or_resume<Client: From<DriverCaller>>(
1359 self,
1360 handler: impl Handler<DriverReplySink> + 'static,
1361 ) -> Result<SessionAcceptOutcome<Client>, SessionError>
1362 where
1363 L: Link + 'static,
1364 L::Tx: MaybeSend + MaybeSync + 'static,
1365 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1366 L::Rx: MaybeSend + 'static,
1367 {
1368 let Self {
1369 link,
1370 root_settings,
1371 metadata,
1372 on_connection,
1373 keepalive,
1374 resumable,
1375 session_registry,
1376 operation_store,
1377 spawn_fn,
1378 } = self;
1379 let (mode, mut link) = accept_transport(link)
1380 .await
1381 .map_err(session_error_from_transport)?;
1382 match mode {
1383 TransportMode::Bare => {
1384 let handshake_result = handshake_as_acceptor(
1385 &link.tx,
1386 &mut link.rx,
1387 root_settings.clone(),
1388 true,
1389 resumable,
1390 None,
1391 )
1392 .await
1393 .map_err(session_error_from_handshake)?;
1394 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1395 .map_err(SessionError::Protocol)?;
1396 let builder = SessionAcceptorBuilder::new(
1397 BareConduit::with_message_plan(link, message_plan),
1398 handshake_result,
1399 );
1400 Self::apply_common_parts(
1401 builder,
1402 root_settings,
1403 metadata,
1404 on_connection,
1405 keepalive,
1406 resumable,
1407 session_registry,
1408 operation_store,
1409 spawn_fn,
1410 )
1411 .establish_or_resume(handler)
1412 .await
1413 }
1414 TransportMode::Stable => Err(SessionError::Protocol(
1415 "stable conduit transport selection is unsupported on wasm".into(),
1416 )),
1417 }
1418 }
1419
1420 #[cfg(not(target_arch = "wasm32"))]
1421 #[allow(clippy::too_many_arguments)]
1422 async fn finish_with_stable_parts<Client: From<DriverCaller>>(
1423 mut link: SplitLink<L::Tx, L::Rx>,
1424 root_settings: ConnectionSettings,
1425 metadata: Metadata<'a>,
1426 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1427 keepalive: Option<SessionKeepaliveConfig>,
1428 resumable: bool,
1429 session_registry: Option<SessionRegistry>,
1430 operation_store: Option<Arc<dyn OperationStore>>,
1431 spawn_fn: SpawnFn,
1432 handler: impl Handler<DriverReplySink> + 'static,
1433 ) -> Result<(Client, SessionHandle), SessionError>
1434 where
1435 L: Link + Send + 'static,
1436 L::Tx: MaybeSend + MaybeSync + Send + 'static,
1437 <L::Tx as vox_types::LinkTx>::Permit: MaybeSend,
1438 L::Rx: MaybeSend + Send + 'static,
1439 {
1440 let handshake_result = handshake_as_acceptor(
1441 &link.tx,
1442 &mut link.rx,
1443 root_settings.clone(),
1444 true,
1445 resumable,
1446 None,
1447 )
1448 .await
1449 .map_err(session_error_from_handshake)?;
1450 let message_plan = crate::MessagePlan::from_handshake(&handshake_result)
1451 .map_err(SessionError::Protocol)?;
1452 let client_hello = crate::stable_conduit::recv_client_hello(&mut link.rx)
1455 .await
1456 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?;
1457 let conduit = StableConduit::<MessageFamily, _>::with_first_link(
1458 link.tx,
1459 link.rx,
1460 Some(client_hello),
1461 crate::stable_conduit::exhausted_source::<SplitLink<L::Tx, L::Rx>>(),
1462 )
1463 .await
1464 .map_err(|e| SessionError::Protocol(format!("stable conduit setup failed: {e}")))?
1465 .with_message_plan(message_plan);
1466 let builder = SessionAcceptorBuilder::new(conduit, handshake_result);
1467 Self::apply_common_parts(
1468 builder,
1469 root_settings,
1470 metadata,
1471 on_connection,
1472 keepalive,
1473 resumable,
1474 session_registry,
1475 operation_store,
1476 spawn_fn,
1477 )
1478 .establish(handler)
1479 .await
1480 }
1481
1482 #[allow(clippy::too_many_arguments)]
1483 #[allow(clippy::too_many_arguments)]
1484 fn apply_common_parts<C>(
1485 mut builder: SessionAcceptorBuilder<'a, C>,
1486 root_settings: ConnectionSettings,
1487 metadata: Metadata<'a>,
1488 on_connection: Option<Box<dyn ConnectionAcceptor>>,
1489 keepalive: Option<SessionKeepaliveConfig>,
1490 resumable: bool,
1491 session_registry: Option<SessionRegistry>,
1492 operation_store: Option<Arc<dyn OperationStore>>,
1493 spawn_fn: SpawnFn,
1494 ) -> SessionAcceptorBuilder<'a, C> {
1495 builder.root_settings = root_settings;
1496 builder.metadata = metadata;
1497 builder.on_connection = on_connection;
1498 builder.keepalive = keepalive;
1499 builder.resumable = resumable;
1500 builder.session_registry = session_registry;
1501 builder.operation_store = operation_store;
1502 builder.spawn_fn = spawn_fn;
1503 builder
1504 }
1505}
1506
1507fn validate_negotiated_root_settings(
1508 expected_root_settings: &ConnectionSettings,
1509 handshake_result: &HandshakeResult,
1510) -> Result<(), SessionError> {
1511 if handshake_result.our_settings != *expected_root_settings {
1512 return Err(SessionError::Protocol(
1513 "negotiated root settings do not match builder settings".into(),
1514 ));
1515 }
1516 Ok(())
1517}
1518
1519fn session_error_from_handshake(error: crate::HandshakeError) -> SessionError {
1520 match error {
1521 crate::HandshakeError::Io(io) => SessionError::Io(io),
1522 crate::HandshakeError::PeerClosed => {
1523 SessionError::Protocol("peer closed during handshake".into())
1524 }
1525 crate::HandshakeError::NotResumable => SessionError::NotResumable,
1526 other => SessionError::Protocol(other.to_string()),
1527 }
1528}
1529
1530fn session_error_from_transport(error: crate::TransportPrologueError) -> SessionError {
1531 match error {
1532 crate::TransportPrologueError::Io(io) => SessionError::Io(io),
1533 crate::TransportPrologueError::LinkDead => {
1534 SessionError::Protocol("link closed during transport prologue".into())
1535 }
1536 crate::TransportPrologueError::Protocol(message) => SessionError::Protocol(message),
1537 crate::TransportPrologueError::Rejected(reason) => {
1538 SessionError::Protocol(format!("transport rejected: {reason}"))
1539 }
1540 }
1541}