1use std::{
2 fmt::Debug,
3 sync::Arc,
4 time::Duration,
5};
6
7use anyhow::{
8 Error,
9 Result,
10};
11use battler_wamp_uri::Uri;
12use battler_wamp_values::{
13 Dictionary,
14 List,
15 Value,
16};
17use log::{
18 debug,
19 error,
20 info,
21 warn,
22};
23use thiserror::Error;
24use tokio::sync::{
25 RwLock,
26 broadcast,
27 mpsc,
28};
29
30use crate::{
31 auth::Identity,
32 core::{
33 error::{
34 BasicError,
35 ChannelTransmittableResult,
36 InteractionError,
37 WampError,
38 },
39 hash::HashMap,
40 id::{
41 Id,
42 IdAllocator,
43 SequentialIdAllocator,
44 },
45 peer_info::{
46 ConnectionType,
47 PeerInfo,
48 },
49 publish_options::PublishOptions,
50 },
51 message::{
52 common::{
53 abort_message_for_error,
54 error_for_request,
55 goodbye_and_out,
56 },
57 message::{
58 ChallengeMessage,
59 InvocationMessage,
60 Message,
61 WelcomeMessage,
62 YieldMessage,
63 },
64 },
65};
66
67#[derive(Debug)]
68struct EstablishingSessionState {
69 realm: Uri,
70 authenticating: bool,
71}
72
73struct EstablishedSessionState {
74 session_id: Id,
75 realm: Uri,
76 welcome_message: WelcomeMessage,
77 subscriptions: HashMap<Id, Subscription>,
78 procedures: HashMap<Id, Procedure>,
79 active_invocations: HashMap<Id, Id>,
80}
81
82impl Debug for EstablishedSessionState {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 #[derive(Debug)]
85 #[allow(unused)]
86 struct DebugEstablishedSessionState<'a> {
87 session_id: &'a Id,
88 realm: &'a Uri,
89 }
90
91 DebugEstablishedSessionState {
92 session_id: &self.session_id,
93 realm: &self.realm,
94 }
95 .fmt(f)
96 }
97}
98
99#[derive(Debug, Default)]
100enum SessionState {
101 #[default]
102 Closed,
103 Establishing(EstablishingSessionState),
104 Established(EstablishedSessionState),
105 Closing,
106}
107
108impl SessionState {
109 fn is_same_state(&self, other: &Self) -> bool {
110 match (self, other) {
111 (Self::Closed, Self::Closed) => true,
112 (Self::Establishing(_), Self::Establishing(_)) => true,
113 (Self::Established(_), Self::Established(_)) => true,
114 (Self::Closing, Self::Closing) => true,
115 _ => false,
116 }
117 }
118 fn allowed_state_transition(&self, next: &Self) -> bool {
119 match (self, next) {
120 (Self::Closed, Self::Establishing(_)) => true,
121 (Self::Establishing(_), Self::Closed) => true,
122 (Self::Establishing(_), Self::Established(_)) => true,
123 (Self::Established(_), Self::Closing) => true,
124 (Self::Established(_), Self::Closed) => true,
125 (Self::Closing, Self::Closed) => true,
126 _ => false,
127 }
128 }
129}
130
131#[derive(Debug, Default, Clone, PartialEq, Eq)]
133pub struct PublishedEvent {
134 pub arguments: List,
135 pub arguments_keyword: Dictionary,
136 pub options: PublishOptions,
137}
138
139#[derive(Debug, Default, Clone, PartialEq, Eq)]
141pub struct ReceivedEvent {
142 pub arguments: List,
143 pub arguments_keyword: Dictionary,
144
145 pub topic: Option<Uri>,
146}
147
148#[derive(Debug, Default, Clone, PartialEq, Eq)]
150pub struct RpcYield {
151 pub arguments: List,
152 pub arguments_keyword: Dictionary,
153}
154
155#[derive(Debug, Error)]
158#[error("invocation does not support progressive results")]
159pub struct ProgressiveResultNotSupportedError;
160
161#[derive(Debug, Clone)]
163pub struct Invocation {
164 pub arguments: List,
165 pub arguments_keyword: Dictionary,
166
167 pub timeout: Duration,
168 pub procedure: Option<Uri>,
169
170 pub peer_info: PeerInfo,
171
172 id: Id,
173 message_tx: mpsc::Sender<Message>,
174 receive_progress: bool,
175}
176
177impl Invocation {
178 pub fn id(&self) -> Id {
180 self.id
181 }
182
183 pub async fn progress(&self, rpc_yield: RpcYield) -> Result<()> {
187 if !self.receive_progress {
188 return Err(ProgressiveResultNotSupportedError.into());
189 }
190 self.message_tx
191 .send(Message::Yield(YieldMessage {
192 invocation_request: self.id,
193 options: Dictionary::from_iter([("progress".to_owned(), Value::Bool(true))]),
194 arguments: rpc_yield.arguments,
195 arguments_keyword: rpc_yield.arguments_keyword,
196 }))
197 .await
198 .map_err(Error::new)
199 }
200
201 pub async fn respond<E>(self, rpc_yield: Result<RpcYield, E>) -> Result<()>
203 where
204 E: Into<WampError>,
205 {
206 match rpc_yield {
207 Ok(rpc_yield) => {
208 self.message_tx
209 .send(Message::Yield(YieldMessage {
210 invocation_request: self.id,
211 options: Dictionary::default(),
212 arguments: rpc_yield.arguments,
213 arguments_keyword: rpc_yield.arguments_keyword,
214 }))
215 .await?
216 }
217 Err(err) => {
218 self.message_tx
219 .send(error_for_request(
220 &Message::Invocation(InvocationMessage {
221 request: self.id,
222 ..Default::default()
223 }),
224 &Into::<WampError>::into(err).into(),
225 ))
226 .await?
227 }
228 }
229 Ok(())
230 }
231
232 pub async fn respond_ok(self, rpc_yield: RpcYield) -> Result<()> {
234 self.respond::<WampError>(Ok(rpc_yield)).await
235 }
236
237 pub async fn respond_error<E>(self, error: E) -> Result<()>
239 where
240 E: Into<WampError>,
241 {
242 self.respond(Err(error)).await
243 }
244}
245
246#[derive(Debug, Clone)]
248pub struct Interrupt {
249 id: Id,
250}
251
252impl Interrupt {
253 pub fn id(&self) -> Id {
254 self.id
255 }
256}
257
258#[derive(Debug, Clone)]
260pub enum ProcedureMessage {
261 Invocation(Invocation),
262 Interrupt(Interrupt),
263}
264
265pub(crate) mod peer_session_message {
266 use battler_wamp_uri::Uri;
267 use battler_wamp_values::{
268 Dictionary,
269 List,
270 };
271 use tokio::sync::broadcast;
272
273 use crate::{
274 core::id::Id,
275 message::message::WelcomeMessage,
276 peer::{
277 ReceivedEvent,
278 session::ProcedureMessage,
279 },
280 };
281
282 #[derive(Debug, Clone)]
284 pub struct EstablishedSession {
285 pub realm: Uri,
286 pub welcome_message: WelcomeMessage,
287 }
288
289 #[derive(Debug)]
291 pub struct Subscription {
292 pub request_id: Id,
293 pub subscription_id: Id,
294 pub event_rx: broadcast::Receiver<ReceivedEvent>,
295 }
296
297 impl Clone for Subscription {
298 fn clone(&self) -> Self {
299 Self {
300 request_id: self.request_id,
301 subscription_id: self.subscription_id,
302 event_rx: self.event_rx.resubscribe(),
303 }
304 }
305 }
306
307 #[derive(Debug, Clone)]
309 pub struct Unsubscription {
310 pub request_id: Id,
311 }
312
313 #[derive(Debug, Clone)]
315 pub struct Publication {
316 pub request_id: Id,
317 }
318
319 #[derive(Debug)]
321 pub struct Registration {
322 pub request_id: Id,
323 pub registration_id: Id,
324 pub procedure_message_rx: broadcast::Receiver<ProcedureMessage>,
325 }
326
327 impl Clone for Registration {
328 fn clone(&self) -> Self {
329 Self {
330 request_id: self.request_id,
331 registration_id: self.registration_id,
332 procedure_message_rx: self.procedure_message_rx.resubscribe(),
333 }
334 }
335 }
336
337 #[derive(Debug, Clone)]
339 pub struct Unregistration {
340 pub request_id: Id,
341 }
342
343 #[derive(Debug, Clone, PartialEq, Eq)]
345 pub struct RpcResult {
346 pub request_id: Id,
347 pub arguments: List,
348 pub arguments_keyword: Dictionary,
349 pub progress: bool,
350 }
351}
352
353#[derive(Debug, Clone)]
354struct Subscription {
355 event_tx: broadcast::Sender<ReceivedEvent>,
356}
357
358#[derive(Debug, Clone)]
359struct Procedure {
360 procedure_tx: broadcast::Sender<ProcedureMessage>,
361}
362
363pub struct SessionHandle {
365 state: Arc<RwLock<SessionState>>,
366 id_allocator: Arc<Box<dyn IdAllocator>>,
367
368 established_session_rx:
369 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::EstablishedSession>>,
370 closed_session_rx: broadcast::Receiver<()>,
371
372 auth_challenge_rx: broadcast::Receiver<ChallengeMessage>,
373
374 subscribed_rx:
375 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Subscription>>,
376 unsubscribed_rx:
377 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unsubscription>>,
378 published_rx:
379 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Publication>>,
380 registered_rx:
381 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Registration>>,
382 unregistered_rx:
383 broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unregistration>>,
384 rpc_result_rx: broadcast::Receiver<ChannelTransmittableResult<peer_session_message::RpcResult>>,
385}
386
387impl SessionHandle {
388 pub async fn current_session_id(&self) -> Option<Id> {
393 match &*self.state.read().await {
394 SessionState::Established(state) => Some(state.session_id),
395 _ => None,
396 }
397 }
398
399 pub fn id_allocator(&self) -> Arc<Box<dyn IdAllocator>> {
401 self.id_allocator.clone()
402 }
403
404 pub fn established_session_rx(
407 &self,
408 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::EstablishedSession>>
409 {
410 self.established_session_rx.resubscribe()
411 }
412
413 pub fn auth_challenge_rx(&self) -> broadcast::Receiver<ChallengeMessage> {
415 self.auth_challenge_rx.resubscribe()
416 }
417
418 pub fn closed_session_rx(&self) -> broadcast::Receiver<()> {
420 self.closed_session_rx.resubscribe()
421 }
422
423 pub fn subscribed_rx(
425 &self,
426 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Subscription>> {
427 self.subscribed_rx.resubscribe()
428 }
429
430 pub fn unsubscribed_rx(
432 &self,
433 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unsubscription>> {
434 self.unsubscribed_rx.resubscribe()
435 }
436
437 pub fn published_rx(
439 &self,
440 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Publication>> {
441 self.published_rx.resubscribe()
442 }
443
444 pub fn registered_rx(
446 &self,
447 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Registration>> {
448 self.registered_rx.resubscribe()
449 }
450
451 pub fn unregistered_rx(
453 &self,
454 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unregistration>> {
455 self.unregistered_rx.resubscribe()
456 }
457
458 pub fn rpc_result_rx(
460 &self,
461 ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::RpcResult>> {
462 self.rpc_result_rx.resubscribe()
463 }
464}
465
466pub struct Session {
470 name: String,
471 service_message_tx: mpsc::Sender<Message>,
472 state: Arc<RwLock<SessionState>>,
473 id_allocator: Arc<Box<dyn IdAllocator>>,
474
475 established_session_tx:
476 broadcast::Sender<ChannelTransmittableResult<peer_session_message::EstablishedSession>>,
477 closed_session_tx: broadcast::Sender<()>,
478
479 auth_challenge_tx: broadcast::Sender<ChallengeMessage>,
480
481 subscribed_tx:
482 broadcast::Sender<ChannelTransmittableResult<peer_session_message::Subscription>>,
483 unsubscribed_tx:
484 broadcast::Sender<ChannelTransmittableResult<peer_session_message::Unsubscription>>,
485 published_tx: broadcast::Sender<ChannelTransmittableResult<peer_session_message::Publication>>,
486 registered_tx:
487 broadcast::Sender<ChannelTransmittableResult<peer_session_message::Registration>>,
488 unregistered_tx:
489 broadcast::Sender<ChannelTransmittableResult<peer_session_message::Unregistration>>,
490 rpc_result_tx: broadcast::Sender<ChannelTransmittableResult<peer_session_message::RpcResult>>,
491}
492
493impl Session {
494 pub fn new(name: String, service_message_tx: mpsc::Sender<Message>) -> Self {
496 let id_allocator = SequentialIdAllocator::default();
497 let (established_session_tx, _) = broadcast::channel(16);
498 let (closed_session_tx, _) = broadcast::channel(16);
499 let (auth_challenge_tx, _) = broadcast::channel(16);
500 let (subscribed_tx, _) = broadcast::channel(16);
501 let (unsubscribed_tx, _) = broadcast::channel(16);
502 let (published_tx, _) = broadcast::channel(16);
503 let (registered_tx, _) = broadcast::channel(16);
504 let (unregistered_tx, _) = broadcast::channel(16);
505 let (rpc_result_tx, _) = broadcast::channel(16);
506 Self {
507 name,
508 service_message_tx,
509 state: Arc::new(RwLock::new(SessionState::default())),
510 id_allocator: Arc::new(Box::new(id_allocator)),
511 established_session_tx,
512 closed_session_tx,
513 auth_challenge_tx,
514 subscribed_tx,
515 unsubscribed_tx,
516 published_tx,
517 registered_tx,
518 unregistered_tx,
519 rpc_result_tx,
520 }
521 }
522
523 pub fn name(&self) -> &str {
525 &self.name
526 }
527
528 pub async fn closed(&self) -> bool {
530 match *self.state.read().await {
531 SessionState::Closed => true,
532 _ => false,
533 }
534 }
535
536 pub fn session_handle(&self) -> SessionHandle {
539 SessionHandle {
540 state: self.state.clone(),
541 id_allocator: self.id_allocator.clone(),
542 established_session_rx: self.established_session_tx.subscribe(),
543 closed_session_rx: self.closed_session_tx.subscribe(),
544 auth_challenge_rx: self.auth_challenge_tx.subscribe(),
545 subscribed_rx: self.subscribed_tx.subscribe(),
546 unsubscribed_rx: self.unsubscribed_tx.subscribe(),
547 published_rx: self.published_tx.subscribe(),
548 registered_rx: self.registered_tx.subscribe(),
549 unregistered_rx: self.unregistered_tx.subscribe(),
550 rpc_result_rx: self.rpc_result_tx.subscribe(),
551 }
552 }
553
554 async fn get_from_establishing_session_state<F, T>(&self, f: F) -> Result<T, Error>
555 where
556 F: Fn(&EstablishingSessionState) -> T,
557 {
558 match &*self.state.read().await {
559 SessionState::Establishing(state) => Ok(f(&state)),
560 _ => Err(Error::msg("session is not in the establishing state")),
561 }
562 }
563
564 async fn modify_establishing_session_state<F, T>(&self, f: F) -> Result<T, Error>
565 where
566 F: FnOnce(&mut EstablishingSessionState) -> T,
567 T: 'static,
568 {
569 match &mut *self.state.write().await {
570 SessionState::Establishing(state) => Ok(f(state)),
571 _ => Err(Error::msg("session is not in the establishing state")),
572 }
573 }
574
575 async fn get_from_established_session_state<F, T>(&self, f: F) -> Result<T, Error>
576 where
577 F: Fn(&EstablishedSessionState) -> T,
578 T: 'static,
579 {
580 match &*self.state.read().await {
581 SessionState::Established(state) => Ok(f(&state)),
582 _ => Err(Error::msg("session is not in the established state")),
583 }
584 }
585
586 async fn modify_established_session_state<F, T>(&self, f: F) -> Result<T, Error>
587 where
588 F: FnOnce(&mut EstablishedSessionState) -> T,
589 T: 'static,
590 {
591 match &mut *self.state.write().await {
592 SessionState::Established(state) => Ok(f(state)),
593 _ => Err(Error::msg("session is not in the established state")),
594 }
595 }
596
597 pub async fn send_message(&self, message: Message) -> Result<()> {
602 match self.transition_state_from_sending_message(&message).await {
603 Ok(()) => (),
604 Err(err) => {
605 self.established_session_tx.send(Err((&err).into())).ok();
608 return Err(err);
609 }
610 }
611 self.service_message_tx
612 .send(message)
613 .await
614 .map_err(Error::new)
615 }
616
617 async fn transition_state_from_sending_message(&self, message: &Message) -> Result<()> {
618 match message {
619 Message::Hello(message) => {
620 self.transition_state(SessionState::Establishing(EstablishingSessionState {
621 realm: message.realm.clone(),
622 authenticating: false,
623 }))
624 .await
625 }
626 Message::Abort(_) => {
627 self.transition_state(SessionState::Closed).await?;
628
629 self.established_session_tx
632 .send(Err(Error::msg("session closed").into()))
633 .ok();
634 Ok(())
635 }
636 Message::Goodbye(_) => {
637 let next_state = match &*self.state.read().await {
638 SessionState::Closing => SessionState::Closed,
639 _ => SessionState::Closing,
640 };
641 self.transition_state(next_state).await
642 }
643 Message::Unsubscribe(message) => {
644 self.modify_established_session_state(|state| {
645 state.subscriptions.remove(&message.subscribed_subscription)
646 })
647 .await?;
648 Ok(())
649 }
650 Message::Unregister(message) => {
651 self.modify_established_session_state(|state| {
652 state.procedures.remove(&message.registered_registration)
653 })
654 .await?;
655 Ok(())
656 }
657 Message::Yield(message) => {
658 self.modify_established_session_state(|state| {
659 state.active_invocations.remove(&message.invocation_request)
660 })
661 .await?;
662 Ok(())
663 }
664 Message::Error(message) => {
665 self.modify_established_session_state(|state| {
666 state.active_invocations.remove(&message.request)
667 })
668 .await?;
669 Ok(())
670 }
671 _ => Ok(()),
672 }
673 }
674
675 pub async fn handle_message(&self, message: Message) -> Result<()> {
677 debug!("Peer {} received message: {message:?}", self.name);
678 if let Err(err) = self.handle_message_on_state_machine(message).await {
679 if !self.state.read().await.is_same_state(&SessionState::Closed) {
680 self.send_message(abort_message_for_error(&err)).await?;
681 }
682 return Err(err);
683 }
684 Ok(())
685 }
686
687 async fn handle_message_on_state_machine(&self, message: Message) -> Result<()> {
688 let mut establishing = false;
690 let mut closing = false;
691 let mut closed = false;
692 match *self.state.read().await {
693 SessionState::Establishing(_) => establishing = true,
694 SessionState::Closed => closed = true,
695 SessionState::Closing => closing = true,
696 _ => (),
697 }
698
699 if closed {
700 Err(InteractionError::ProtocolViolation(format!(
701 "received {} message on a closed session",
702 message.message_name()
703 ))
704 .into())
705 } else if establishing {
706 let result = self.handle_establishing(message).await;
707 if let Err(err) = &result {
708 self.transition_state(SessionState::Closed).await?;
709 self.established_session_tx.send(Err(err.into())).ok();
710 }
711 result
712 } else if closing {
713 self.handle_closing(message).await
714 } else {
715 self.handle_established(message).await
716 }
717 }
718
719 async fn handle_establishing(&self, message: Message) -> Result<()> {
720 match message {
721 Message::Welcome(message) => {
722 let realm = self
723 .get_from_establishing_session_state(|state| state.realm.clone())
724 .await?;
725
726 self.transition_state(SessionState::Established(EstablishedSessionState {
727 session_id: message.session,
728 realm,
729 welcome_message: message,
730 subscriptions: HashMap::default(),
731 procedures: HashMap::default(),
732 active_invocations: HashMap::default(),
733 }))
734 .await
735 }
736 Message::Challenge(message) => {
737 let authenticating = self
738 .get_from_establishing_session_state(|state| state.authenticating)
739 .await?;
740 if authenticating {
741 return Err(Error::msg("duplicate challenge received"));
742 }
743 self.modify_establishing_session_state(|state| state.authenticating = true)
744 .await?;
745 self.auth_challenge_tx.send(message)?;
746 Ok(())
747 }
748 message @ Message::Abort(_) => Err((&message).into()),
749 _ => Err(InteractionError::ProtocolViolation(format!(
750 "received {} message on an establishing session",
751 message.message_name()
752 ))
753 .into()),
754 }
755 }
756
757 async fn handle_established(&self, message: Message) -> Result<()> {
758 match message {
759 Message::Abort(_) => {
760 warn!(
761 "Peer session {} for {} aborted by peer: {message:?}",
762 self.get_from_established_session_state(|state| state.session_id)
763 .await?,
764 self.name
765 );
766 self.transition_state(SessionState::Closed).await
767 }
768 Message::Goodbye(_) => {
769 self.transition_state(SessionState::Closing).await?;
770 self.send_message(goodbye_and_out()).await
771 }
772 ref message @ Message::Error(ref error_message) => {
773 match error_message.request_type {
774 Message::SUBSCRIBE_TAG => {
775 self.subscribed_tx.send(Err(message.try_into()?))?;
776 }
777 Message::UNSUBSCRIBE_TAG => {
778 self.unsubscribed_tx.send(Err(message.try_into()?))?;
779 }
780 Message::PUBLISH_TAG => {
781 self.published_tx.send(Err(message.try_into()?))?;
782 }
783 Message::REGISTER_TAG => {
784 self.registered_tx.send(Err(message.try_into()?))?;
785 }
786 Message::UNREGISTER_TAG => {
787 self.registered_tx.send(Err(message.try_into()?))?;
788 }
789 Message::CALL_TAG => {
790 self.rpc_result_tx.send(Err(message.try_into()?))?;
791 }
792 _ => {
793 error!(
794 "Invalid ERROR message with request type {} received from the router: {error_message:?}",
795 error_message.request_type
796 );
797 return Err(
798 BasicError::InvalidArgument("invalid request type".to_owned()).into(),
799 );
800 }
801 }
802 Ok(())
803 }
804 Message::Subscribed(message) => {
805 let (event_tx, event_rx) = broadcast::channel(16);
806 self.modify_established_session_state(|state| {
807 state
808 .subscriptions
809 .insert(message.subscription, Subscription { event_tx })
810 })
811 .await?;
812 self.subscribed_tx
813 .send(Ok(peer_session_message::Subscription {
814 request_id: message.subscribe_request,
815 subscription_id: message.subscription,
816 event_rx,
817 }))?;
818 Ok(())
819 }
820 Message::Unsubscribed(message) => {
821 self.unsubscribed_tx
822 .send(Ok(peer_session_message::Unsubscription {
823 request_id: message.unsubscribe_request,
824 }))?;
825 Ok(())
826 }
827 Message::Published(message) => {
828 self.published_tx
829 .send(Ok(peer_session_message::Publication {
830 request_id: message.publish_request,
831 }))?;
832 Ok(())
833 }
834 Message::Event(message) => {
835 let subscription = match self
836 .get_from_established_session_state(|state| {
837 state
838 .subscriptions
839 .get(&message.subscribed_subscription)
840 .cloned()
841 })
842 .await?
843 {
844 Some(subscription) => subscription,
845 None => return Ok(()),
846 };
847 let reported_topic = message
848 .details
849 .get("topic")
850 .and_then(|val| val.string())
851 .and_then(|val| Uri::try_from(val).ok());
852 subscription.event_tx.send(ReceivedEvent {
853 arguments: message.publish_arguments,
854 arguments_keyword: message.publish_arguments_keyword,
855 topic: reported_topic,
856 })?;
857 Ok(())
858 }
859 Message::Registered(message) => {
860 let (procedure_tx, procedure_message_rx) = broadcast::channel(16);
861 self.modify_established_session_state(|state| {
862 state
863 .procedures
864 .insert(message.registration, Procedure { procedure_tx })
865 })
866 .await?;
867 self.registered_tx
868 .send(Ok(peer_session_message::Registration {
869 request_id: message.register_request,
870 registration_id: message.registration,
871 procedure_message_rx,
872 }))?;
873 Ok(())
874 }
875 Message::Unregistered(message) => {
876 self.unregistered_tx
877 .send(Ok(peer_session_message::Unregistration {
878 request_id: message.unregister_request,
879 }))?;
880 Ok(())
881 }
882 Message::Invocation(message) => {
883 let procedure = match self
884 .get_from_established_session_state(|state| {
885 state
886 .procedures
887 .get(&message.registered_registration)
888 .cloned()
889 })
890 .await?
891 {
892 Some(procedure) => procedure,
893 None => return Ok(()),
894 };
895 self.modify_established_session_state(|state| {
896 state
897 .active_invocations
898 .insert(message.request, message.registered_registration)
899 })
900 .await?;
901 let receive_progress = message
902 .details
903 .get("receive_progress")
904 .and_then(|val| val.bool())
905 .unwrap_or(false);
906 let timeout = message
907 .details
908 .get("timeout")
909 .and_then(|val| val.integer())
910 .unwrap_or(0);
911 let timeout = Duration::from_millis(timeout);
912 let reported_procedure = message
913 .details
914 .get("procedure")
915 .and_then(|val| val.string())
916 .and_then(|val| Uri::try_from(val).ok());
917
918 let connection_type = if message
919 .details
920 .get("battler_wamp_direct_peer")
921 .is_some_and(|val| val.bool().is_some_and(|val| val))
922 {
923 ConnectionType::Direct
924 } else {
925 ConnectionType::Remote(
926 message
927 .details
928 .get("battler_wamp_remote_addr")
929 .map(|val| val.string())
930 .flatten()
931 .unwrap_or_default()
932 .to_owned(),
933 )
934 };
935
936 let mut identity = Identity::default();
937 if let Some(auth_id) = message.details.get("caller_authid") {
938 identity.id = auth_id.string().unwrap_or_default().to_owned();
939 }
940 if let Some(auth_role) = message.details.get("caller_authrole") {
941 identity.role = auth_role.string().unwrap_or_default().to_owned();
942 }
943
944 let peer_info = PeerInfo {
945 connection_type,
946 identity,
947 };
948
949 procedure
950 .procedure_tx
951 .send(ProcedureMessage::Invocation(Invocation {
952 arguments: message.call_arguments,
953 arguments_keyword: message.call_arguments_keyword,
954 timeout,
955 procedure: reported_procedure,
956 peer_info,
957 id: message.request,
958 message_tx: self.service_message_tx.clone(),
959 receive_progress,
960 }))?;
961 Ok(())
962 }
963 Message::Result(message) => {
964 let progress = message
965 .details
966 .get("progress")
967 .and_then(|val| val.bool())
968 .unwrap_or(false);
969 self.rpc_result_tx
970 .send(Ok(peer_session_message::RpcResult {
971 request_id: message.call_request,
972 arguments: message.yield_arguments,
973 arguments_keyword: message.yield_arguments_keyword,
974 progress,
975 }))?;
976 Ok(())
977 }
978 Message::Interrupt(message) => {
979 let procedure = match self
980 .get_from_established_session_state(|state| {
981 state
982 .active_invocations
983 .get(&message.invocation_request)
984 .and_then(|id| state.procedures.get(&id))
985 .cloned()
986 })
987 .await?
988 {
989 Some(procedure) => procedure,
990 None => return Ok(()),
991 };
992 procedure
993 .procedure_tx
994 .send(ProcedureMessage::Interrupt(Interrupt {
995 id: message.invocation_request,
996 }))?;
997 Ok(())
998 }
999 _ => Err(InteractionError::ProtocolViolation(format!(
1000 "received {} message on an established session",
1001 message.message_name()
1002 ))
1003 .into()),
1004 }
1005 }
1006
1007 async fn handle_closing(&self, message: Message) -> Result<()> {
1008 match message {
1009 Message::Goodbye(_) => self.transition_state(SessionState::Closed).await,
1010 _ => Err(InteractionError::ProtocolViolation(format!(
1011 "received {} message on a closing session",
1012 message.message_name()
1013 ))
1014 .into()),
1015 }
1016 }
1017
1018 async fn validate_state_transition(&self, state: &SessionState) -> Result<bool> {
1019 let current_state = self.state.read().await;
1020 if current_state.is_same_state(state) {
1021 return Ok(true);
1022 }
1023
1024 if !current_state.allowed_state_transition(&state) {
1025 return Err(BasicError::Internal(format!(
1026 "invalid state transition from {:?} to {state:?}",
1027 self.state
1028 ))
1029 .into());
1030 }
1031
1032 Ok(false)
1033 }
1034
1035 async fn transition_state(&self, state: SessionState) -> Result<()> {
1036 if self.validate_state_transition(&state).await? {
1037 return Ok(());
1038 }
1039
1040 debug!(
1041 "Peer {} transitioned from {:?} to {state:?}",
1042 self.name,
1043 self.state.read().await
1044 );
1045 *self.state.write().await = state;
1046
1047 match &*self.state.read().await {
1048 SessionState::Established(state) => {
1049 info!(
1050 "Peer {} established session {} on realm {}",
1051 self.name, state.session_id, state.realm
1052 );
1053 self.id_allocator.reset().await;
1054 self.established_session_tx
1055 .send(Ok(peer_session_message::EstablishedSession {
1056 realm: state.realm.clone(),
1057 welcome_message: state.welcome_message.clone(),
1058 }))?;
1059 }
1060 SessionState::Closed => {
1061 self.closed_session_tx.send(())?;
1062 }
1063 _ => (),
1064 }
1065
1066 Ok(())
1067 }
1068}