use super::subscriber::*;
use super::pubsub_core::*;
use super::message_publisher::*;
use super::subscriber_handle::*;
use futures::future;
use futures::future::{BoxFuture};
use std::sync::*;
use std::collections::{VecDeque, HashMap};
pub struct WeakPublisher<Message> {
pub (super) core: Weak<Mutex<PubCore<Message>>>
}
impl<Message: Clone> WeakPublisher<Message> {
pub fn count_subscribers(&self) -> usize {
self.core.upgrade()
.map(|core| core.lock().unwrap().subscribers.len())
.unwrap_or(0)
}
pub fn republish(&self) -> Self {
WeakPublisher {
core: Weak::clone(&self.core)
}
}
}
impl<Message: 'static+Send+Clone> MessagePublisher for WeakPublisher<Message> {
type Message = Message;
fn subscribe(&mut self) -> Subscriber<Message> {
let core = self.core.upgrade();
if let Some(core) = core {
let subscriber_id = SubscriberHandle::new();
let sub_core = SubCore {
id: subscriber_id,
published: true,
waiting: VecDeque::new(),
reserved: 0,
notify_waiting: HashMap::new(),
notify_ready: HashMap::new(),
};
let sub_core = Arc::new(Mutex::new(sub_core));
let pub_core = Arc::downgrade(&core);
{
let mut core = core.lock().unwrap();
core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
}
Subscriber::new(pub_core, sub_core)
} else {
let sub_core = SubCore {
id: SubscriberHandle::new(),
published: true,
waiting: VecDeque::new(),
reserved: 0,
notify_waiting: HashMap::new(),
notify_ready: HashMap::new(),
};
Subscriber::new(Weak::default(), Arc::new(Mutex::new(sub_core)))
}
}
fn when_ready(&mut self) -> BoxFuture<'static, MessageSender<Message>> {
let core = self.core.upgrade();
if let Some(core) = core {
let when_ready = PubCore::send_all_subscribers(&core);
Box::pin(when_ready)
} else {
Box::pin(future::ready(MessageSender::new(|_msg| {}, || {})))
}
}
fn when_empty(&mut self) -> BoxFuture<'static, ()> {
let core = self.core.upgrade();
if let Some(core) = core {
let when_empty = PubCore::when_empty(&core);
Box::pin(when_empty)
} else {
Box::pin(future::ready(()))
}
}
fn is_closed(&self) -> bool {
self.core.upgrade().is_none()
}
fn when_closed(&self) -> BoxFuture<'static, ()> {
if let Some(core) = self.core.upgrade() {
Box::pin(CoreClosedFuture::new(core))
} else {
Box::pin(future::ready(()))
}
}
}