use super::subscriber::*;
use super::pubsub_core::*;
use super::message_publisher::*;
use super::subscriber_handle::*;
use futures::future::{BoxFuture};
use std::sync::*;
use std::collections::{HashMap, VecDeque};
pub struct ExpiringPublisher<Message> {
core: Arc<Mutex<PubCore<Message>>>
}
impl<Message: Clone> ExpiringPublisher<Message> {
pub fn new(buffer_size: usize) -> ExpiringPublisher<Message> {
let core = PubCore {
publisher_count: 1,
subscribers: HashMap::new(),
notify_closed: HashMap::new(),
waiting: vec![],
max_queue_size: buffer_size
};
ExpiringPublisher {
core: Arc::new(Mutex::new(core))
}
}
pub fn count_subscribers(&self) -> usize {
self.core.lock().unwrap().subscribers.len()
}
pub fn republish(&self) -> Self {
self.core.lock().unwrap().publisher_count += 1;
ExpiringPublisher {
core: Arc::clone(&self.core)
}
}
}
impl<Message: 'static+Send+Clone> MessagePublisher for ExpiringPublisher<Message> {
type Message = Message;
fn subscribe(&mut self) -> Subscriber<Message> {
let subscriber_id = SubscriberHandle::new();
let sub_core = SubCore {
id: subscriber_id,
published: true,
waiting: VecDeque::new(),
reserved: 0,
notify_waiting: HashMap::new(),
notify_ready: HashMap::new(),
};
let sub_core = Arc::new(Mutex::new(sub_core));
let pub_core = Arc::downgrade(&self.core);
{
let mut core = self.core.lock().unwrap();
core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
}
Subscriber::new(pub_core, sub_core)
}
fn when_ready(&mut self) -> BoxFuture<'static, MessageSender<Message>> {
let when_ready = PubCore::send_all_expiring_oldest(&self.core);
Box::pin(when_ready)
}
fn when_empty(&mut self) -> BoxFuture<'static, ()> {
let when_empty = PubCore::when_empty(&self.core);
Box::pin(when_empty)
}
fn is_closed(&self) -> bool { false }
fn when_closed(&self) -> BoxFuture<'static, ()> {
Box::pin(CoreClosedFuture::new(Arc::clone(&self.core)))
}
}
impl<Message> Drop for ExpiringPublisher<Message> {
fn drop(&mut self) {
let to_notify = {
let mut pub_core = self.core.lock().unwrap();
pub_core.publisher_count -= 1;
if pub_core.publisher_count == 0 {
let mut to_notify = pub_core.notify_closed.drain()
.map(|(_id, waker)| waker)
.collect::<Vec<_>>();
for subscriber in pub_core.subscribers.values() {
let mut subscriber = subscriber.lock().unwrap();
subscriber.published = false;
subscriber.notify_ready = HashMap::new();
to_notify.extend(subscriber.notify_waiting.drain().map(|(_, waker)| waker));
}
to_notify
} else {
vec![]
}
};
to_notify.into_iter().for_each(|notify| notify.wake());
}
}