messenger 0.1.0

two way messenger
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;

use futures::{Async, Future, Poll};

use super::{Message, MessageKind, Receiver, Sender};

type Callback<T> = Box<Fn(Vec<T>)>;
type Callbacks<T> = HashMap<u32, Callback<T>>;

type Subscribe<T> = (u32, Box<Fn(&T) -> Option<T>>);
type Subscribers<T> = HashMap<String, Vec<Subscribe<T>>>;

struct MessengerInner<T> {
    sender: Box<Sender<Item = Message<T>>>,
    message_id: AtomicU32,
    subscriber_id: AtomicU32,
    subscribers: RwLock<Subscribers<T>>,
    callbacks: RwLock<Callbacks<T>>,
}

pub struct Messenger<T>(Arc<MessengerInner<T>>);

impl<T> Clone for Messenger<T> {
    #[inline]
    fn clone(&self) -> Self {
        Messenger(self.0.clone())
    }
}

impl<T> Messenger<T> {
    #[inline]
    pub fn new<R, S>(sender: S, receiver: R) -> (Self, impl Future<Item = (), Error = ()>)
    where
        R: Receiver<Item = Message<T>, Error = ()>,
        S: 'static + Sender<Item = Message<T>>,
    {
        let messenger = Messenger(Arc::new(MessengerInner {
            sender: Box::new(sender),
            message_id: AtomicU32::new(0),
            subscriber_id: AtomicU32::new(0),
            subscribers: RwLock::new(HashMap::new()),
            callbacks: RwLock::new(HashMap::new()),
        }));

        let receiver_messenger = messenger.clone();
        let future = ReceiverFuture::new(receiver, move |message| {
            receiver_messenger.on_message(message)
        });

        (messenger, future)
    }

    #[inline]
    pub fn close(&self) {
        let message_id = self.next_message_id();
        let _ = self.0.sender.send(Message::new_close(message_id));
    }

    #[inline]
    pub fn on<N, F>(&self, name: N, f: F) -> u32
    where
        N: Into<String>,
        F: 'static + Fn(&T) -> Option<T>,
    {
        let subscriber_id = self.next_subscriber_id();
        {
            let mut subscribers_mut = self.subscribers_mut();
            let subscribers = subscribers_mut.entry(name.into()).or_insert_with(Vec::new);

            subscribers.push((subscriber_id, Box::new(f)));
        }
        subscriber_id
    }

    #[inline]
    pub fn off<N>(&self, name: N, id: u32)
    where
        N: Into<String>,
    {
        let mut subscribers_mut = self.subscribers_mut();
        let name = name.into();

        let remove = if let Some(subscribers) = subscribers_mut.get_mut(&name) {
            if let Some(index) = subscribers.iter().position(|&(sid, _)| sid == id) {
                subscribers.remove(index);
            }
            subscribers.is_empty()
        } else {
            false
        };

        if remove {
            subscribers_mut.remove(&name);
        }
    }

    #[inline]
    pub fn all_off<N>(&self, name: N)
    where
        N: Into<String>,
    {
        let mut subscribers_mut = self.subscribers_mut();
        let name = name.into();
        subscribers_mut.remove(&name);
    }

    #[inline]
    pub fn send<N, V, F>(&self, name: N, value: V, f: F) -> Result<(), T>
    where
        N: Into<String>,
        V: Into<T>,
        F: 'static + Fn(Vec<T>),
    {
        let (message_id, result) = self.internal_send(name.into(), value.into());
        self.callbacks_mut().insert(message_id, Box::new(f));
        result
    }

    #[inline]
    pub fn send_no_callback<N, V>(&self, name: N, value: V) -> Result<(), T>
    where
        N: Into<String>,
        V: Into<T>,
    {
        let (_, result) = self.internal_send(name.into(), value.into());
        result
    }

    #[inline]
    pub fn internal_close(&self) {
        let message_id = self.next_message_id();
        let _ = self.0.sender.send(Message::new_request_close(message_id));
    }

    #[inline]
    fn internal_send(&self, name: String, value: T) -> (u32, Result<(), T>) {
        let message_id = self.next_message_id();

        let result = self.0
            .sender
            .send(Message::new(message_id, name.into(), value.into()));

        (
            message_id,
            match result {
                Ok(()) => Ok(()),
                Err(message) => match message.take_kind() {
                    MessageKind::Data(_, value) => Err(value),
                    _ => unimplemented!(),
                },
            },
        )
    }

    #[inline]
    fn send_callback(&self, message_id: u32, data: Vec<T>) -> Result<(), Vec<T>> {
        let message = Message::new_callback(message_id, data);

        match self.0.sender.send(message) {
            Ok(()) => Ok(()),
            Err(message) => match message.take_kind() {
                MessageKind::Callback(data) => Err(data),
                _ => unimplemented!(),
            },
        }
    }

    #[inline]
    fn on_message(&self, message: Message<T>) -> bool {
        let message_id = message.id();

        match message.take_kind() {
            MessageKind::Callback(data) => {
                self.on_callback(message_id, data);
                false
            }
            MessageKind::Data(name, data) => {
                self.on_data(message_id, name, data);
                false
            }
            MessageKind::Close => {
                self.internal_close();
                true
            }
            MessageKind::RequestClose => true,
        }
    }

    #[inline]
    fn on_callback(&self, message_id: u32, data: Vec<T>) {
        if let Some(callback) = self.callbacks_mut().remove(&message_id) {
            (callback)(data);
        }
    }

    #[inline]
    fn on_data(&self, message_id: u32, name: String, data: T) {
        let subscribers = self.subscribers();

        if let Some(subscribes) = subscribers.get(&name) {
            let mut out = Vec::with_capacity(subscribes.len());

            for &(_, ref subscribe) in subscribes {
                match (subscribe)(&data) {
                    Some(r) => out.push(r),
                    None => (),
                }
            }

            let _ = self.send_callback(message_id, out);
        }
    }

    #[inline]
    fn callbacks_mut(&self) -> RwLockWriteGuard<Callbacks<T>> {
        self.0
            .callbacks
            .write()
            .expect("failed to acquire callbacks write lock")
    }

    #[inline]
    fn subscribers(&self) -> RwLockReadGuard<Subscribers<T>> {
        self.0
            .subscribers
            .read()
            .expect("failed to acquire subscribers read lock")
    }
    #[inline]
    fn subscribers_mut(&self) -> RwLockWriteGuard<Subscribers<T>> {
        self.0
            .subscribers
            .write()
            .expect("failed to acquire subscribers write lock")
    }

    #[inline]
    fn next_message_id(&self) -> u32 {
        self.0.message_id.fetch_add(1, Ordering::SeqCst)
    }

    #[inline]
    fn next_subscriber_id(&self) -> u32 {
        self.0.subscriber_id.fetch_add(1, Ordering::SeqCst)
    }
}

struct ReceiverFuture<R, F>
where
    R: Receiver,
    F: FnMut(R::Item) -> bool,
{
    receiver: R,
    f: F,
    close: bool,
}

impl<R, F> ReceiverFuture<R, F>
where
    R: Receiver,
    F: FnMut(R::Item) -> bool,
{
    #[inline]
    pub fn new(r: R, f: F) -> Self {
        ReceiverFuture {
            receiver: r,
            f: f,
            close: false,
        }
    }
}

impl<R, F> Future for ReceiverFuture<R, F>
where
    R: Receiver,
    F: FnMut(R::Item) -> bool,
{
    type Item = ();
    type Error = R::Error;

    #[inline]
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            if self.close {
                self.receiver.close();
                return Ok(Async::Ready(()));
            }

            match try_ready!(self.receiver.poll()) {
                Some(v) => self.close = (self.f)(v),
                None => return Ok(Async::Ready(())),
            }
        }
    }
}