battler_wamp/peer/
session.rs

1use std::{
2    fmt::Debug,
3    sync::Arc,
4    time::Duration,
5};
6
7use anyhow::{
8    Error,
9    Result,
10};
11use battler_wamp_uri::Uri;
12use battler_wamp_values::{
13    Dictionary,
14    List,
15    Value,
16};
17use log::{
18    debug,
19    error,
20    info,
21    warn,
22};
23use thiserror::Error;
24use tokio::sync::{
25    RwLock,
26    broadcast,
27    mpsc,
28};
29
30use crate::{
31    auth::Identity,
32    core::{
33        error::{
34            BasicError,
35            ChannelTransmittableResult,
36            InteractionError,
37            WampError,
38        },
39        hash::HashMap,
40        id::{
41            Id,
42            IdAllocator,
43            SequentialIdAllocator,
44        },
45        peer_info::{
46            ConnectionType,
47            PeerInfo,
48        },
49        publish_options::PublishOptions,
50    },
51    message::{
52        common::{
53            abort_message_for_error,
54            error_for_request,
55            goodbye_and_out,
56        },
57        message::{
58            ChallengeMessage,
59            InvocationMessage,
60            Message,
61            WelcomeMessage,
62            YieldMessage,
63        },
64    },
65};
66
67#[derive(Debug)]
68struct EstablishingSessionState {
69    realm: Uri,
70    authenticating: bool,
71}
72
73struct EstablishedSessionState {
74    session_id: Id,
75    realm: Uri,
76    welcome_message: WelcomeMessage,
77    subscriptions: HashMap<Id, Subscription>,
78    procedures: HashMap<Id, Procedure>,
79    active_invocations: HashMap<Id, Id>,
80}
81
82impl Debug for EstablishedSessionState {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        #[derive(Debug)]
85        #[allow(unused)]
86        struct DebugEstablishedSessionState<'a> {
87            session_id: &'a Id,
88            realm: &'a Uri,
89        }
90
91        DebugEstablishedSessionState {
92            session_id: &self.session_id,
93            realm: &self.realm,
94        }
95        .fmt(f)
96    }
97}
98
99#[derive(Debug, Default)]
100enum SessionState {
101    #[default]
102    Closed,
103    Establishing(EstablishingSessionState),
104    Established(EstablishedSessionState),
105    Closing,
106}
107
108impl SessionState {
109    fn is_same_state(&self, other: &Self) -> bool {
110        match (self, other) {
111            (Self::Closed, Self::Closed) => true,
112            (Self::Establishing(_), Self::Establishing(_)) => true,
113            (Self::Established(_), Self::Established(_)) => true,
114            (Self::Closing, Self::Closing) => true,
115            _ => false,
116        }
117    }
118    fn allowed_state_transition(&self, next: &Self) -> bool {
119        match (self, next) {
120            (Self::Closed, Self::Establishing(_)) => true,
121            (Self::Establishing(_), Self::Closed) => true,
122            (Self::Establishing(_), Self::Established(_)) => true,
123            (Self::Established(_), Self::Closing) => true,
124            (Self::Established(_), Self::Closed) => true,
125            (Self::Closing, Self::Closed) => true,
126            _ => false,
127        }
128    }
129}
130
131/// An event published to a topic.
132#[derive(Debug, Default, Clone, PartialEq, Eq)]
133pub struct PublishedEvent {
134    pub arguments: List,
135    pub arguments_keyword: Dictionary,
136    pub options: PublishOptions,
137}
138
139/// An event published to a topic.
140#[derive(Debug, Default, Clone, PartialEq, Eq)]
141pub struct ReceivedEvent {
142    pub arguments: List,
143    pub arguments_keyword: Dictionary,
144
145    pub topic: Option<Uri>,
146}
147
148/// The result of an RPC, generated by the callee.
149#[derive(Debug, Default, Clone, PartialEq, Eq)]
150pub struct RpcYield {
151    pub arguments: List,
152    pub arguments_keyword: Dictionary,
153}
154
155/// Error for a callee attempting to respond to an RPC with a progressive result when it is not
156/// supported by the caller.
157#[derive(Debug, Error)]
158#[error("invocation does not support progressive results")]
159pub struct ProgressiveResultNotSupportedError;
160
161/// An invocation of a procedure.
162#[derive(Debug, Clone)]
163pub struct Invocation {
164    pub arguments: List,
165    pub arguments_keyword: Dictionary,
166
167    pub timeout: Duration,
168    pub procedure: Option<Uri>,
169
170    pub peer_info: PeerInfo,
171
172    id: Id,
173    message_tx: mpsc::Sender<Message>,
174    receive_progress: bool,
175}
176
177impl Invocation {
178    /// The invocation ID.
179    pub fn id(&self) -> Id {
180        self.id
181    }
182
183    /// Responds to the invocation with a progressive result.
184    ///
185    /// Fails if the invocation (a.k.a., the caller) does not support progressive results.
186    pub async fn progress(&self, rpc_yield: RpcYield) -> Result<()> {
187        if !self.receive_progress {
188            return Err(ProgressiveResultNotSupportedError.into());
189        }
190        self.message_tx
191            .send(Message::Yield(YieldMessage {
192                invocation_request: self.id,
193                options: Dictionary::from_iter([("progress".to_owned(), Value::Bool(true))]),
194                arguments: rpc_yield.arguments,
195                arguments_keyword: rpc_yield.arguments_keyword,
196            }))
197            .await
198            .map_err(Error::new)
199    }
200
201    /// Responds to the invocation with a result.
202    pub async fn respond<E>(self, rpc_yield: Result<RpcYield, E>) -> Result<()>
203    where
204        E: Into<WampError>,
205    {
206        match rpc_yield {
207            Ok(rpc_yield) => {
208                self.message_tx
209                    .send(Message::Yield(YieldMessage {
210                        invocation_request: self.id,
211                        options: Dictionary::default(),
212                        arguments: rpc_yield.arguments,
213                        arguments_keyword: rpc_yield.arguments_keyword,
214                    }))
215                    .await?
216            }
217            Err(err) => {
218                self.message_tx
219                    .send(error_for_request(
220                        &Message::Invocation(InvocationMessage {
221                            request: self.id,
222                            ..Default::default()
223                        }),
224                        &Into::<WampError>::into(err).into(),
225                    ))
226                    .await?
227            }
228        }
229        Ok(())
230    }
231
232    /// Responds to the invocation with a successful result.
233    pub async fn respond_ok(self, rpc_yield: RpcYield) -> Result<()> {
234        self.respond::<WampError>(Ok(rpc_yield)).await
235    }
236
237    /// Responds to the invocation with an error.
238    pub async fn respond_error<E>(self, error: E) -> Result<()>
239    where
240        E: Into<WampError>,
241    {
242        self.respond(Err(error)).await
243    }
244}
245
246/// An interrupt of an invocation.
247#[derive(Debug, Clone)]
248pub struct Interrupt {
249    id: Id,
250}
251
252impl Interrupt {
253    pub fn id(&self) -> Id {
254        self.id
255    }
256}
257
258/// A message for a single procedure that must be strongly ordered.
259#[derive(Debug, Clone)]
260pub enum ProcedureMessage {
261    Invocation(Invocation),
262    Interrupt(Interrupt),
263}
264
265pub(crate) mod peer_session_message {
266    use battler_wamp_uri::Uri;
267    use battler_wamp_values::{
268        Dictionary,
269        List,
270    };
271    use tokio::sync::broadcast;
272
273    use crate::{
274        core::id::Id,
275        message::message::WelcomeMessage,
276        peer::{
277            ReceivedEvent,
278            session::ProcedureMessage,
279        },
280    };
281
282    /// The result of establishing a session.
283    #[derive(Debug, Clone)]
284    pub struct EstablishedSession {
285        pub realm: Uri,
286        pub welcome_message: WelcomeMessage,
287    }
288
289    /// A subscription made on a topic.
290    #[derive(Debug)]
291    pub struct Subscription {
292        pub request_id: Id,
293        pub subscription_id: Id,
294        pub event_rx: broadcast::Receiver<ReceivedEvent>,
295    }
296
297    impl Clone for Subscription {
298        fn clone(&self) -> Self {
299            Self {
300                request_id: self.request_id,
301                subscription_id: self.subscription_id,
302                event_rx: self.event_rx.resubscribe(),
303            }
304        }
305    }
306
307    /// A confirmation that a subscription was dropped.
308    #[derive(Debug, Clone)]
309    pub struct Unsubscription {
310        pub request_id: Id,
311    }
312
313    /// A confirmation that an event was published.
314    #[derive(Debug, Clone)]
315    pub struct Publication {
316        pub request_id: Id,
317    }
318
319    /// A confirmation that a procedure was registered.
320    #[derive(Debug)]
321    pub struct Registration {
322        pub request_id: Id,
323        pub registration_id: Id,
324        pub procedure_message_rx: broadcast::Receiver<ProcedureMessage>,
325    }
326
327    impl Clone for Registration {
328        fn clone(&self) -> Self {
329            Self {
330                request_id: self.request_id,
331                registration_id: self.registration_id,
332                procedure_message_rx: self.procedure_message_rx.resubscribe(),
333            }
334        }
335    }
336
337    /// A confirmation that a procedure was deregistered.
338    #[derive(Debug, Clone)]
339    pub struct Unregistration {
340        pub request_id: Id,
341    }
342
343    /// A result of a procedure call.
344    #[derive(Debug, Clone, PartialEq, Eq)]
345    pub struct RpcResult {
346        pub request_id: Id,
347        pub arguments: List,
348        pub arguments_keyword: Dictionary,
349        pub progress: bool,
350    }
351}
352
353#[derive(Debug, Clone)]
354struct Subscription {
355    event_tx: broadcast::Sender<ReceivedEvent>,
356}
357
358#[derive(Debug, Clone)]
359struct Procedure {
360    procedure_tx: broadcast::Sender<ProcedureMessage>,
361}
362
363/// A handle to an asynchronously-running peer session.
364pub struct SessionHandle {
365    state: Arc<RwLock<SessionState>>,
366    id_allocator: Arc<Box<dyn IdAllocator>>,
367
368    established_session_rx:
369        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::EstablishedSession>>,
370    closed_session_rx: broadcast::Receiver<()>,
371
372    auth_challenge_rx: broadcast::Receiver<ChallengeMessage>,
373
374    subscribed_rx:
375        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Subscription>>,
376    unsubscribed_rx:
377        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unsubscription>>,
378    published_rx:
379        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Publication>>,
380    registered_rx:
381        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Registration>>,
382    unregistered_rx:
383        broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unregistration>>,
384    rpc_result_rx: broadcast::Receiver<ChannelTransmittableResult<peer_session_message::RpcResult>>,
385}
386
387impl SessionHandle {
388    /// The current session ID, as given by the router.
389    ///
390    /// Since a peer session is reused across multiple router sessions for the same peer, this ID is
391    /// subject to change at any point.
392    pub async fn current_session_id(&self) -> Option<Id> {
393        match &*self.state.read().await {
394            SessionState::Established(state) => Some(state.session_id),
395            _ => None,
396        }
397    }
398
399    /// A reference to the session's ID generator.
400    pub fn id_allocator(&self) -> Arc<Box<dyn IdAllocator>> {
401        self.id_allocator.clone()
402    }
403
404    /// The receiver channel for establishing a session (moving the session to the ESTABLISHED
405    /// state).
406    pub fn established_session_rx(
407        &self,
408    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::EstablishedSession>>
409    {
410        self.established_session_rx.resubscribe()
411    }
412
413    /// The receiver channel for authentication challenges.
414    pub fn auth_challenge_rx(&self) -> broadcast::Receiver<ChallengeMessage> {
415        self.auth_challenge_rx.resubscribe()
416    }
417
418    /// The receiver channel, populated when the session moves to the CLOSED state.
419    pub fn closed_session_rx(&self) -> broadcast::Receiver<()> {
420        self.closed_session_rx.resubscribe()
421    }
422
423    /// The receiver channel for responses to SUBSCRIBE messages.
424    pub fn subscribed_rx(
425        &self,
426    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Subscription>> {
427        self.subscribed_rx.resubscribe()
428    }
429
430    /// The receiver channel for responses to UNSUBSCRIBE messages.
431    pub fn unsubscribed_rx(
432        &self,
433    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unsubscription>> {
434        self.unsubscribed_rx.resubscribe()
435    }
436
437    /// The receiver channel for responses to PUBLISH messages.
438    pub fn published_rx(
439        &self,
440    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Publication>> {
441        self.published_rx.resubscribe()
442    }
443
444    /// The receiver channel for responses to REGISTER messages.
445    pub fn registered_rx(
446        &self,
447    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Registration>> {
448        self.registered_rx.resubscribe()
449    }
450
451    /// The receiver channel for responses to UNREGISTER messages.
452    pub fn unregistered_rx(
453        &self,
454    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::Unregistration>> {
455        self.unregistered_rx.resubscribe()
456    }
457
458    /// The receiver channel for responses to CALL messages.
459    pub fn rpc_result_rx(
460        &self,
461    ) -> broadcast::Receiver<ChannelTransmittableResult<peer_session_message::RpcResult>> {
462        self.rpc_result_rx.resubscribe()
463    }
464}
465
466/// The peer end of a WAMP session.
467///
468/// Handles WAMP messages in a state machine and holds all session-scoped state.
469pub struct Session {
470    name: String,
471    service_message_tx: mpsc::Sender<Message>,
472    state: Arc<RwLock<SessionState>>,
473    id_allocator: Arc<Box<dyn IdAllocator>>,
474
475    established_session_tx:
476        broadcast::Sender<ChannelTransmittableResult<peer_session_message::EstablishedSession>>,
477    closed_session_tx: broadcast::Sender<()>,
478
479    auth_challenge_tx: broadcast::Sender<ChallengeMessage>,
480
481    subscribed_tx:
482        broadcast::Sender<ChannelTransmittableResult<peer_session_message::Subscription>>,
483    unsubscribed_tx:
484        broadcast::Sender<ChannelTransmittableResult<peer_session_message::Unsubscription>>,
485    published_tx: broadcast::Sender<ChannelTransmittableResult<peer_session_message::Publication>>,
486    registered_tx:
487        broadcast::Sender<ChannelTransmittableResult<peer_session_message::Registration>>,
488    unregistered_tx:
489        broadcast::Sender<ChannelTransmittableResult<peer_session_message::Unregistration>>,
490    rpc_result_tx: broadcast::Sender<ChannelTransmittableResult<peer_session_message::RpcResult>>,
491}
492
493impl Session {
494    /// Creates a new session over a service.
495    pub fn new(name: String, service_message_tx: mpsc::Sender<Message>) -> Self {
496        let id_allocator = SequentialIdAllocator::default();
497        let (established_session_tx, _) = broadcast::channel(16);
498        let (closed_session_tx, _) = broadcast::channel(16);
499        let (auth_challenge_tx, _) = broadcast::channel(16);
500        let (subscribed_tx, _) = broadcast::channel(16);
501        let (unsubscribed_tx, _) = broadcast::channel(16);
502        let (published_tx, _) = broadcast::channel(16);
503        let (registered_tx, _) = broadcast::channel(16);
504        let (unregistered_tx, _) = broadcast::channel(16);
505        let (rpc_result_tx, _) = broadcast::channel(16);
506        Self {
507            name,
508            service_message_tx,
509            state: Arc::new(RwLock::new(SessionState::default())),
510            id_allocator: Arc::new(Box::new(id_allocator)),
511            established_session_tx,
512            closed_session_tx,
513            auth_challenge_tx,
514            subscribed_tx,
515            unsubscribed_tx,
516            published_tx,
517            registered_tx,
518            unregistered_tx,
519            rpc_result_tx,
520        }
521    }
522
523    /// The name of the session.
524    pub fn name(&self) -> &str {
525        &self.name
526    }
527
528    /// Checks if the session is closed.
529    pub async fn closed(&self) -> bool {
530        match *self.state.read().await {
531            SessionState::Closed => true,
532            _ => false,
533        }
534    }
535
536    /// Generates a handle to the session, which can be saved separately from the session's
537    /// lifecycle.
538    pub fn session_handle(&self) -> SessionHandle {
539        SessionHandle {
540            state: self.state.clone(),
541            id_allocator: self.id_allocator.clone(),
542            established_session_rx: self.established_session_tx.subscribe(),
543            closed_session_rx: self.closed_session_tx.subscribe(),
544            auth_challenge_rx: self.auth_challenge_tx.subscribe(),
545            subscribed_rx: self.subscribed_tx.subscribe(),
546            unsubscribed_rx: self.unsubscribed_tx.subscribe(),
547            published_rx: self.published_tx.subscribe(),
548            registered_rx: self.registered_tx.subscribe(),
549            unregistered_rx: self.unregistered_tx.subscribe(),
550            rpc_result_rx: self.rpc_result_tx.subscribe(),
551        }
552    }
553
554    async fn get_from_establishing_session_state<F, T>(&self, f: F) -> Result<T, Error>
555    where
556        F: Fn(&EstablishingSessionState) -> T,
557    {
558        match &*self.state.read().await {
559            SessionState::Establishing(state) => Ok(f(&state)),
560            _ => Err(Error::msg("session is not in the establishing state")),
561        }
562    }
563
564    async fn modify_establishing_session_state<F, T>(&self, f: F) -> Result<T, Error>
565    where
566        F: FnOnce(&mut EstablishingSessionState) -> T,
567        T: 'static,
568    {
569        match &mut *self.state.write().await {
570            SessionState::Establishing(state) => Ok(f(state)),
571            _ => Err(Error::msg("session is not in the establishing state")),
572        }
573    }
574
575    async fn get_from_established_session_state<F, T>(&self, f: F) -> Result<T, Error>
576    where
577        F: Fn(&EstablishedSessionState) -> T,
578        T: 'static,
579    {
580        match &*self.state.read().await {
581            SessionState::Established(state) => Ok(f(&state)),
582            _ => Err(Error::msg("session is not in the established state")),
583        }
584    }
585
586    async fn modify_established_session_state<F, T>(&self, f: F) -> Result<T, Error>
587    where
588        F: FnOnce(&mut EstablishedSessionState) -> T,
589        T: 'static,
590    {
591        match &mut *self.state.write().await {
592            SessionState::Established(state) => Ok(f(state)),
593            _ => Err(Error::msg("session is not in the established state")),
594        }
595    }
596
597    /// Sends a message over the session.
598    ///
599    /// Messages should not be sent directly over the underlying service. By sending messages
600    /// through the session, the session state can be updated accordingly.
601    pub async fn send_message(&self, message: Message) -> Result<()> {
602        match self.transition_state_from_sending_message(&message).await {
603            Ok(()) => (),
604            Err(err) => {
605                // If we failed to transition state, we may need to communicate the error to the
606                // peer waiting for join a realm.
607                self.established_session_tx.send(Err((&err).into())).ok();
608                return Err(err);
609            }
610        }
611        self.service_message_tx
612            .send(message)
613            .await
614            .map_err(Error::new)
615    }
616
617    async fn transition_state_from_sending_message(&self, message: &Message) -> Result<()> {
618        match message {
619            Message::Hello(message) => {
620                self.transition_state(SessionState::Establishing(EstablishingSessionState {
621                    realm: message.realm.clone(),
622                    authenticating: false,
623                }))
624                .await
625            }
626            Message::Abort(_) => {
627                self.transition_state(SessionState::Closed).await?;
628
629                // The peer may be waiting to join a realm, so we must communicate that the session
630                // closed.
631                self.established_session_tx
632                    .send(Err(Error::msg("session closed").into()))
633                    .ok();
634                Ok(())
635            }
636            Message::Goodbye(_) => {
637                let next_state = match &*self.state.read().await {
638                    SessionState::Closing => SessionState::Closed,
639                    _ => SessionState::Closing,
640                };
641                self.transition_state(next_state).await
642            }
643            Message::Unsubscribe(message) => {
644                self.modify_established_session_state(|state| {
645                    state.subscriptions.remove(&message.subscribed_subscription)
646                })
647                .await?;
648                Ok(())
649            }
650            Message::Unregister(message) => {
651                self.modify_established_session_state(|state| {
652                    state.procedures.remove(&message.registered_registration)
653                })
654                .await?;
655                Ok(())
656            }
657            Message::Yield(message) => {
658                self.modify_established_session_state(|state| {
659                    state.active_invocations.remove(&message.invocation_request)
660                })
661                .await?;
662                Ok(())
663            }
664            Message::Error(message) => {
665                self.modify_established_session_state(|state| {
666                    state.active_invocations.remove(&message.request)
667                })
668                .await?;
669                Ok(())
670            }
671            _ => Ok(()),
672        }
673    }
674
675    /// Handles a message over the session state machine.
676    pub async fn handle_message(&self, message: Message) -> Result<()> {
677        debug!("Peer {} received message: {message:?}", self.name);
678        if let Err(err) = self.handle_message_on_state_machine(message).await {
679            if !self.state.read().await.is_same_state(&SessionState::Closed) {
680                self.send_message(abort_message_for_error(&err)).await?;
681            }
682            return Err(err);
683        }
684        Ok(())
685    }
686
687    async fn handle_message_on_state_machine(&self, message: Message) -> Result<()> {
688        // Read state separately from handling the message, so that we don't lock the session state.
689        let mut establishing = false;
690        let mut closing = false;
691        let mut closed = false;
692        match *self.state.read().await {
693            SessionState::Establishing(_) => establishing = true,
694            SessionState::Closed => closed = true,
695            SessionState::Closing => closing = true,
696            _ => (),
697        }
698
699        if closed {
700            Err(InteractionError::ProtocolViolation(format!(
701                "received {} message on a closed session",
702                message.message_name()
703            ))
704            .into())
705        } else if establishing {
706            let result = self.handle_establishing(message).await;
707            if let Err(err) = &result {
708                self.transition_state(SessionState::Closed).await?;
709                self.established_session_tx.send(Err(err.into())).ok();
710            }
711            result
712        } else if closing {
713            self.handle_closing(message).await
714        } else {
715            self.handle_established(message).await
716        }
717    }
718
719    async fn handle_establishing(&self, message: Message) -> Result<()> {
720        match message {
721            Message::Welcome(message) => {
722                let realm = self
723                    .get_from_establishing_session_state(|state| state.realm.clone())
724                    .await?;
725
726                self.transition_state(SessionState::Established(EstablishedSessionState {
727                    session_id: message.session,
728                    realm,
729                    welcome_message: message,
730                    subscriptions: HashMap::default(),
731                    procedures: HashMap::default(),
732                    active_invocations: HashMap::default(),
733                }))
734                .await
735            }
736            Message::Challenge(message) => {
737                let authenticating = self
738                    .get_from_establishing_session_state(|state| state.authenticating)
739                    .await?;
740                if authenticating {
741                    return Err(Error::msg("duplicate challenge received"));
742                }
743                self.modify_establishing_session_state(|state| state.authenticating = true)
744                    .await?;
745                self.auth_challenge_tx.send(message)?;
746                Ok(())
747            }
748            message @ Message::Abort(_) => Err((&message).into()),
749            _ => Err(InteractionError::ProtocolViolation(format!(
750                "received {} message on an establishing session",
751                message.message_name()
752            ))
753            .into()),
754        }
755    }
756
757    async fn handle_established(&self, message: Message) -> Result<()> {
758        match message {
759            Message::Abort(_) => {
760                warn!(
761                    "Peer session {} for {} aborted by peer: {message:?}",
762                    self.get_from_established_session_state(|state| state.session_id)
763                        .await?,
764                    self.name
765                );
766                self.transition_state(SessionState::Closed).await
767            }
768            Message::Goodbye(_) => {
769                self.transition_state(SessionState::Closing).await?;
770                self.send_message(goodbye_and_out()).await
771            }
772            ref message @ Message::Error(ref error_message) => {
773                match error_message.request_type {
774                    Message::SUBSCRIBE_TAG => {
775                        self.subscribed_tx.send(Err(message.try_into()?))?;
776                    }
777                    Message::UNSUBSCRIBE_TAG => {
778                        self.unsubscribed_tx.send(Err(message.try_into()?))?;
779                    }
780                    Message::PUBLISH_TAG => {
781                        self.published_tx.send(Err(message.try_into()?))?;
782                    }
783                    Message::REGISTER_TAG => {
784                        self.registered_tx.send(Err(message.try_into()?))?;
785                    }
786                    Message::UNREGISTER_TAG => {
787                        self.registered_tx.send(Err(message.try_into()?))?;
788                    }
789                    Message::CALL_TAG => {
790                        self.rpc_result_tx.send(Err(message.try_into()?))?;
791                    }
792                    _ => {
793                        error!(
794                            "Invalid ERROR message with request type {} received from the router: {error_message:?}",
795                            error_message.request_type
796                        );
797                        return Err(
798                            BasicError::InvalidArgument("invalid request type".to_owned()).into(),
799                        );
800                    }
801                }
802                Ok(())
803            }
804            Message::Subscribed(message) => {
805                let (event_tx, event_rx) = broadcast::channel(16);
806                self.modify_established_session_state(|state| {
807                    state
808                        .subscriptions
809                        .insert(message.subscription, Subscription { event_tx })
810                })
811                .await?;
812                self.subscribed_tx
813                    .send(Ok(peer_session_message::Subscription {
814                        request_id: message.subscribe_request,
815                        subscription_id: message.subscription,
816                        event_rx,
817                    }))?;
818                Ok(())
819            }
820            Message::Unsubscribed(message) => {
821                self.unsubscribed_tx
822                    .send(Ok(peer_session_message::Unsubscription {
823                        request_id: message.unsubscribe_request,
824                    }))?;
825                Ok(())
826            }
827            Message::Published(message) => {
828                self.published_tx
829                    .send(Ok(peer_session_message::Publication {
830                        request_id: message.publish_request,
831                    }))?;
832                Ok(())
833            }
834            Message::Event(message) => {
835                let subscription = match self
836                    .get_from_established_session_state(|state| {
837                        state
838                            .subscriptions
839                            .get(&message.subscribed_subscription)
840                            .cloned()
841                    })
842                    .await?
843                {
844                    Some(subscription) => subscription,
845                    None => return Ok(()),
846                };
847                let reported_topic = message
848                    .details
849                    .get("topic")
850                    .and_then(|val| val.string())
851                    .and_then(|val| Uri::try_from(val).ok());
852                subscription.event_tx.send(ReceivedEvent {
853                    arguments: message.publish_arguments,
854                    arguments_keyword: message.publish_arguments_keyword,
855                    topic: reported_topic,
856                })?;
857                Ok(())
858            }
859            Message::Registered(message) => {
860                let (procedure_tx, procedure_message_rx) = broadcast::channel(16);
861                self.modify_established_session_state(|state| {
862                    state
863                        .procedures
864                        .insert(message.registration, Procedure { procedure_tx })
865                })
866                .await?;
867                self.registered_tx
868                    .send(Ok(peer_session_message::Registration {
869                        request_id: message.register_request,
870                        registration_id: message.registration,
871                        procedure_message_rx,
872                    }))?;
873                Ok(())
874            }
875            Message::Unregistered(message) => {
876                self.unregistered_tx
877                    .send(Ok(peer_session_message::Unregistration {
878                        request_id: message.unregister_request,
879                    }))?;
880                Ok(())
881            }
882            Message::Invocation(message) => {
883                let procedure = match self
884                    .get_from_established_session_state(|state| {
885                        state
886                            .procedures
887                            .get(&message.registered_registration)
888                            .cloned()
889                    })
890                    .await?
891                {
892                    Some(procedure) => procedure,
893                    None => return Ok(()),
894                };
895                self.modify_established_session_state(|state| {
896                    state
897                        .active_invocations
898                        .insert(message.request, message.registered_registration)
899                })
900                .await?;
901                let receive_progress = message
902                    .details
903                    .get("receive_progress")
904                    .and_then(|val| val.bool())
905                    .unwrap_or(false);
906                let timeout = message
907                    .details
908                    .get("timeout")
909                    .and_then(|val| val.integer())
910                    .unwrap_or(0);
911                let timeout = Duration::from_millis(timeout);
912                let reported_procedure = message
913                    .details
914                    .get("procedure")
915                    .and_then(|val| val.string())
916                    .and_then(|val| Uri::try_from(val).ok());
917
918                let connection_type = if message
919                    .details
920                    .get("battler_wamp_direct_peer")
921                    .is_some_and(|val| val.bool().is_some_and(|val| val))
922                {
923                    ConnectionType::Direct
924                } else {
925                    ConnectionType::Remote(
926                        message
927                            .details
928                            .get("battler_wamp_remote_addr")
929                            .map(|val| val.string())
930                            .flatten()
931                            .unwrap_or_default()
932                            .to_owned(),
933                    )
934                };
935
936                let mut identity = Identity::default();
937                if let Some(auth_id) = message.details.get("caller_authid") {
938                    identity.id = auth_id.string().unwrap_or_default().to_owned();
939                }
940                if let Some(auth_role) = message.details.get("caller_authrole") {
941                    identity.role = auth_role.string().unwrap_or_default().to_owned();
942                }
943
944                let peer_info = PeerInfo {
945                    connection_type,
946                    identity,
947                };
948
949                procedure
950                    .procedure_tx
951                    .send(ProcedureMessage::Invocation(Invocation {
952                        arguments: message.call_arguments,
953                        arguments_keyword: message.call_arguments_keyword,
954                        timeout,
955                        procedure: reported_procedure,
956                        peer_info,
957                        id: message.request,
958                        message_tx: self.service_message_tx.clone(),
959                        receive_progress,
960                    }))?;
961                Ok(())
962            }
963            Message::Result(message) => {
964                let progress = message
965                    .details
966                    .get("progress")
967                    .and_then(|val| val.bool())
968                    .unwrap_or(false);
969                self.rpc_result_tx
970                    .send(Ok(peer_session_message::RpcResult {
971                        request_id: message.call_request,
972                        arguments: message.yield_arguments,
973                        arguments_keyword: message.yield_arguments_keyword,
974                        progress,
975                    }))?;
976                Ok(())
977            }
978            Message::Interrupt(message) => {
979                let procedure = match self
980                    .get_from_established_session_state(|state| {
981                        state
982                            .active_invocations
983                            .get(&message.invocation_request)
984                            .and_then(|id| state.procedures.get(&id))
985                            .cloned()
986                    })
987                    .await?
988                {
989                    Some(procedure) => procedure,
990                    None => return Ok(()),
991                };
992                procedure
993                    .procedure_tx
994                    .send(ProcedureMessage::Interrupt(Interrupt {
995                        id: message.invocation_request,
996                    }))?;
997                Ok(())
998            }
999            _ => Err(InteractionError::ProtocolViolation(format!(
1000                "received {} message on an established session",
1001                message.message_name()
1002            ))
1003            .into()),
1004        }
1005    }
1006
1007    async fn handle_closing(&self, message: Message) -> Result<()> {
1008        match message {
1009            Message::Goodbye(_) => self.transition_state(SessionState::Closed).await,
1010            _ => Err(InteractionError::ProtocolViolation(format!(
1011                "received {} message on a closing session",
1012                message.message_name()
1013            ))
1014            .into()),
1015        }
1016    }
1017
1018    async fn validate_state_transition(&self, state: &SessionState) -> Result<bool> {
1019        let current_state = self.state.read().await;
1020        if current_state.is_same_state(state) {
1021            return Ok(true);
1022        }
1023
1024        if !current_state.allowed_state_transition(&state) {
1025            return Err(BasicError::Internal(format!(
1026                "invalid state transition from {:?} to {state:?}",
1027                self.state
1028            ))
1029            .into());
1030        }
1031
1032        Ok(false)
1033    }
1034
1035    async fn transition_state(&self, state: SessionState) -> Result<()> {
1036        if self.validate_state_transition(&state).await? {
1037            return Ok(());
1038        }
1039
1040        debug!(
1041            "Peer {} transitioned from {:?} to {state:?}",
1042            self.name,
1043            self.state.read().await
1044        );
1045        *self.state.write().await = state;
1046
1047        match &*self.state.read().await {
1048            SessionState::Established(state) => {
1049                info!(
1050                    "Peer {} established session {} on realm {}",
1051                    self.name, state.session_id, state.realm
1052                );
1053                self.id_allocator.reset().await;
1054                self.established_session_tx
1055                    .send(Ok(peer_session_message::EstablishedSession {
1056                        realm: state.realm.clone(),
1057                        welcome_message: state.welcome_message.clone(),
1058                    }))?;
1059            }
1060            SessionState::Closed => {
1061                self.closed_session_tx.send(())?;
1062            }
1063            _ => (),
1064        }
1065
1066        Ok(())
1067    }
1068}