use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;
use futures::{Async, Future, Poll};
use super::{Message, MessageKind, Receiver, Sender};
type Callback<T> = Box<Fn(Vec<T>)>;
type Callbacks<T> = HashMap<u32, Callback<T>>;
type Subscribe<T> = (u32, Box<Fn(&T) -> Option<T>>);
type Subscribers<T> = HashMap<String, Vec<Subscribe<T>>>;
struct MessengerInner<T> {
sender: Box<Sender<Item = Message<T>>>,
message_id: AtomicU32,
subscriber_id: AtomicU32,
subscribers: RwLock<Subscribers<T>>,
callbacks: RwLock<Callbacks<T>>,
}
pub struct Messenger<T>(Arc<MessengerInner<T>>);
impl<T> Clone for Messenger<T> {
#[inline]
fn clone(&self) -> Self {
Messenger(self.0.clone())
}
}
impl<T> Messenger<T> {
#[inline]
pub fn new<R, S>(sender: S, receiver: R) -> (Self, impl Future<Item = (), Error = ()>)
where
R: Receiver<Item = Message<T>, Error = ()>,
S: 'static + Sender<Item = Message<T>>,
{
let messenger = Messenger(Arc::new(MessengerInner {
sender: Box::new(sender),
message_id: AtomicU32::new(0),
subscriber_id: AtomicU32::new(0),
subscribers: RwLock::new(HashMap::new()),
callbacks: RwLock::new(HashMap::new()),
}));
let receiver_messenger = messenger.clone();
let future = ReceiverFuture::new(receiver, move |message| {
receiver_messenger.on_message(message)
});
(messenger, future)
}
#[inline]
pub fn close(&self) {
let message_id = self.next_message_id();
let _ = self.0.sender.send(Message::new_close(message_id));
}
#[inline]
pub fn on<N, F>(&self, name: N, f: F) -> u32
where
N: Into<String>,
F: 'static + Fn(&T) -> Option<T>,
{
let subscriber_id = self.next_subscriber_id();
{
let mut subscribers_mut = self.subscribers_mut();
let subscribers = subscribers_mut.entry(name.into()).or_insert_with(Vec::new);
subscribers.push((subscriber_id, Box::new(f)));
}
subscriber_id
}
#[inline]
pub fn off<N>(&self, name: N, id: u32)
where
N: Into<String>,
{
let mut subscribers_mut = self.subscribers_mut();
let name = name.into();
let remove = if let Some(subscribers) = subscribers_mut.get_mut(&name) {
if let Some(index) = subscribers.iter().position(|&(sid, _)| sid == id) {
subscribers.remove(index);
}
subscribers.is_empty()
} else {
false
};
if remove {
subscribers_mut.remove(&name);
}
}
#[inline]
pub fn all_off<N>(&self, name: N)
where
N: Into<String>,
{
let mut subscribers_mut = self.subscribers_mut();
let name = name.into();
subscribers_mut.remove(&name);
}
#[inline]
pub fn send<N, V, F>(&self, name: N, value: V, f: F) -> Result<(), T>
where
N: Into<String>,
V: Into<T>,
F: 'static + Fn(Vec<T>),
{
let (message_id, result) = self.internal_send(name.into(), value.into());
self.callbacks_mut().insert(message_id, Box::new(f));
result
}
#[inline]
pub fn send_no_callback<N, V>(&self, name: N, value: V) -> Result<(), T>
where
N: Into<String>,
V: Into<T>,
{
let (_, result) = self.internal_send(name.into(), value.into());
result
}
#[inline]
pub fn internal_close(&self) {
let message_id = self.next_message_id();
let _ = self.0.sender.send(Message::new_request_close(message_id));
}
#[inline]
fn internal_send(&self, name: String, value: T) -> (u32, Result<(), T>) {
let message_id = self.next_message_id();
let result = self.0
.sender
.send(Message::new(message_id, name.into(), value.into()));
(
message_id,
match result {
Ok(()) => Ok(()),
Err(message) => match message.take_kind() {
MessageKind::Data(_, value) => Err(value),
_ => unimplemented!(),
},
},
)
}
#[inline]
fn send_callback(&self, message_id: u32, data: Vec<T>) -> Result<(), Vec<T>> {
let message = Message::new_callback(message_id, data);
match self.0.sender.send(message) {
Ok(()) => Ok(()),
Err(message) => match message.take_kind() {
MessageKind::Callback(data) => Err(data),
_ => unimplemented!(),
},
}
}
#[inline]
fn on_message(&self, message: Message<T>) -> bool {
let message_id = message.id();
match message.take_kind() {
MessageKind::Callback(data) => {
self.on_callback(message_id, data);
false
}
MessageKind::Data(name, data) => {
self.on_data(message_id, name, data);
false
}
MessageKind::Close => {
self.internal_close();
true
}
MessageKind::RequestClose => true,
}
}
#[inline]
fn on_callback(&self, message_id: u32, data: Vec<T>) {
if let Some(callback) = self.callbacks_mut().remove(&message_id) {
(callback)(data);
}
}
#[inline]
fn on_data(&self, message_id: u32, name: String, data: T) {
let subscribers = self.subscribers();
if let Some(subscribes) = subscribers.get(&name) {
let mut out = Vec::with_capacity(subscribes.len());
for &(_, ref subscribe) in subscribes {
match (subscribe)(&data) {
Some(r) => out.push(r),
None => (),
}
}
let _ = self.send_callback(message_id, out);
}
}
#[inline]
fn callbacks_mut(&self) -> RwLockWriteGuard<Callbacks<T>> {
self.0
.callbacks
.write()
.expect("failed to acquire callbacks write lock")
}
#[inline]
fn subscribers(&self) -> RwLockReadGuard<Subscribers<T>> {
self.0
.subscribers
.read()
.expect("failed to acquire subscribers read lock")
}
#[inline]
fn subscribers_mut(&self) -> RwLockWriteGuard<Subscribers<T>> {
self.0
.subscribers
.write()
.expect("failed to acquire subscribers write lock")
}
#[inline]
fn next_message_id(&self) -> u32 {
self.0.message_id.fetch_add(1, Ordering::SeqCst)
}
#[inline]
fn next_subscriber_id(&self) -> u32 {
self.0.subscriber_id.fetch_add(1, Ordering::SeqCst)
}
}
struct ReceiverFuture<R, F>
where
R: Receiver,
F: FnMut(R::Item) -> bool,
{
receiver: R,
f: F,
close: bool,
}
impl<R, F> ReceiverFuture<R, F>
where
R: Receiver,
F: FnMut(R::Item) -> bool,
{
#[inline]
pub fn new(r: R, f: F) -> Self {
ReceiverFuture {
receiver: r,
f: f,
close: false,
}
}
}
impl<R, F> Future for ReceiverFuture<R, F>
where
R: Receiver,
F: FnMut(R::Item) -> bool,
{
type Item = ();
type Error = R::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if self.close {
self.receiver.close();
return Ok(Async::Ready(()));
}
match try_ready!(self.receiver.poll()) {
Some(v) => self.close = (self.f)(v),
None => return Ok(Async::Ready(())),
}
}
}
}