wampire 0.2.1

A asynchronous WAMPv2 client and router implenting the basic WAMP profile
Documentation
use std::sync::Arc;

use log::{debug, info};

use crate::{
    messages::{ErrorType, EventDetails, Message, PublishOptions, Reason, SubscribeOptions, URI},
    Dict, Error, ErrorKind, List, MatchingPolicy, WampResult,
};

use super::{messaging::send_message, random_id, ConnectionHandler};

mod patterns;
pub use self::patterns::SubscriptionPatternNode;

impl ConnectionHandler {
    pub fn handle_subscribe(
        &mut self,
        request_id: u64,
        options: SubscribeOptions,
        topic: URI,
    ) -> WampResult<()> {
        debug!(
            "Responding to subscribe message (id: {}, topic: {})",
            request_id, topic.uri
        );
        match self.realm {
            Some(ref realm) => {
                let mut realm = realm.lock().unwrap();
                let manager = &mut realm.subscription_manager;
                let topic_id = {
                    let topic_id = match manager.subscriptions.subscribe_with(
                        &topic,
                        Arc::clone(&self.info),
                        options.pattern_match,
                    ) {
                        Ok(topic_id) => topic_id,
                        Err(e) => {
                            return Err(Error::new(ErrorKind::ErrorReason(
                                ErrorType::Subscribe,
                                request_id,
                                e.reason(),
                            )))
                        }
                    };
                    self.subscribed_topics.push(topic_id);
                    topic_id
                };
                manager.subscription_ids_to_uris.insert(
                    topic_id,
                    (topic.uri, options.pattern_match == MatchingPolicy::Prefix),
                );
                send_message(&self.info, &Message::Subscribed(request_id, topic_id))
            }
            None => Err(Error::new(ErrorKind::InvalidState(
                "Received a message while not attached to a realm",
            ))),
        }
    }

    pub fn handle_unsubscribe(&mut self, request_id: u64, topic_id: u64) -> WampResult<()> {
        match self.realm {
            Some(ref realm) => {
                let mut realm = realm.lock().unwrap();
                let manager = &mut realm.subscription_manager;
                let (topic_uri, is_prefix) = match manager.subscription_ids_to_uris.get(&topic_id) {
                    Some(&(ref uri, ref is_prefix)) => (uri.clone(), *is_prefix),
                    None => {
                        return Err(Error::new(ErrorKind::ErrorReason(
                            ErrorType::Unsubscribe,
                            request_id,
                            Reason::NoSuchSubscription,
                        )))
                    }
                };

                let topic_id = match manager
                    .subscriptions
                    .unsubscribe_with(&topic_uri, &self.info, is_prefix)
                {
                    Ok(topic_id) => topic_id,
                    Err(e) => {
                        return Err(Error::new(ErrorKind::ErrorReason(
                            ErrorType::Unsubscribe,
                            request_id,
                            e.reason(),
                        )))
                    }
                };
                self.subscribed_topics.retain(|id| *id != topic_id);
                send_message(&self.info, &Message::Unsubscribed(request_id))
            }
            None => Err(Error::new(ErrorKind::InvalidState(
                "Received a message while not attached to a realm",
            ))),
        }
    }

    pub fn handle_publish(
        &mut self,
        request_id: u64,
        options: PublishOptions,
        topic: URI,
        args: Option<List>,
        kwargs: Option<Dict>,
    ) -> WampResult<()> {
        debug!(
            "Responding to publish message (id: {}, topic: {})",
            request_id, topic.uri
        );
        match self.realm {
            Some(ref realm) => {
                let realm = realm.lock().unwrap();
                let manager = &realm.subscription_manager;
                let publication_id = random_id();
                let mut event_message =
                    Message::Event(1, publication_id, EventDetails::new(), args, kwargs);
                let my_id = { self.info.lock().unwrap().id };
                info!("Current topic tree: {:?}", manager.subscriptions);
                for (subscriber, topic_id, policy) in manager.subscriptions.filter(topic.clone()) {
                    if subscriber.lock().unwrap().id != my_id {
                        if let Message::Event(
                            ref mut old_topic,
                            ref _publish_id,
                            ref mut details,
                            ref _args,
                            ref _kwargs,
                        ) = event_message
                        {
                            *old_topic = topic_id;
                            details.topic = if policy == MatchingPolicy::Strict {
                                None
                            } else {
                                Some(topic.clone())
                            };
                        }
                        send_message(subscriber, &event_message)?;
                    }
                }
                if options.should_acknowledge() {
                    send_message(&self.info, &Message::Published(request_id, publication_id))?;
                }
                Ok(())
            }
            None => Err(Error::new(ErrorKind::InvalidState(
                "Received a message while not attached to a realm",
            ))),
        }
    }
}