flo_stream/
single_publisher.rs1use super::subscriber::*;
2use super::pubsub_core::*;
3use super::message_publisher::*;
4
5use futures::future::{BoxFuture};
6
7use std::sync::*;
8use std::collections::{HashMap, VecDeque};
9
10pub struct SinglePublisher<Message> {
17 core: Arc<Mutex<PubCore<Message>>>
19}
20
21impl<Message> SinglePublisher<Message> {
22 pub fn new(buffer_size: usize) -> SinglePublisher<Message> {
26 let core = PubCore {
28 next_subscriber_id: 0,
29 publisher_count: 1,
30 subscribers: HashMap::new(),
31 notify_closed: HashMap::new(),
32 waiting: vec![],
33 max_queue_size: buffer_size
34 };
35
36 SinglePublisher {
38 core: Arc::new(Mutex::new(core))
39 }
40 }
41
42 pub fn count_subscribers(&self) -> usize {
46 self.core.lock().unwrap().subscribers.len()
47 }
48
49 pub fn republish(&self) -> Self {
53 self.core.lock().unwrap().publisher_count += 1;
54
55 SinglePublisher {
56 core: Arc::clone(&self.core)
57 }
58 }
59}
60
61impl<Message: 'static+Send> MessagePublisher for SinglePublisher<Message> {
62 type Message = Message;
63
64 fn subscribe(&mut self) -> Subscriber<Message> {
70 let subscriber_id = {
72 let mut core = self.core.lock().unwrap();
73 let id = core.next_subscriber_id;
74 core.next_subscriber_id += 1;
75
76 id
77 };
78
79 let sub_core = SubCore {
81 id: subscriber_id,
82 published: true,
83 waiting: VecDeque::new(),
84 reserved: 0,
85 notify_waiting: vec![],
86 notify_ready: vec![],
87 notify_complete: vec![]
88 };
89
90 let sub_core = Arc::new(Mutex::new(sub_core));
92 let pub_core = Arc::downgrade(&self.core);
93
94 {
96 let mut core = self.core.lock().unwrap();
97 core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
98 }
99
100 Subscriber::new(pub_core, sub_core)
102 }
103
104 fn when_ready(&mut self) -> BoxFuture<'static, MessageSender<Message>> {
108 let when_ready = PubCore::send_single_subscriber(&self.core);
109
110 Box::pin(when_ready)
111 }
112
113 fn when_empty(&mut self) -> BoxFuture<'static, ()> {
117 let when_empty = PubCore::when_empty(&self.core);
118
119 Box::pin(when_empty)
120 }
121
122 fn is_closed(&self) -> bool { false }
126
127 fn when_closed(&self) -> BoxFuture<'static, ()> {
131 Box::pin(CoreClosedFuture::new(Arc::clone(&self.core)))
132 }
133}
134
135impl<Message> Drop for SinglePublisher<Message> {
136 fn drop(&mut self) {
137 let to_notify = {
138 let mut pub_core = self.core.lock().unwrap();
140
141 pub_core.publisher_count -= 1;
143 if pub_core.publisher_count == 0 {
144 let mut to_notify = pub_core.notify_closed.drain()
146 .map(|(_id, waker)| waker)
147 .collect::<Vec<_>>();
148
149 for subscriber in pub_core.subscribers.values() {
150 let mut subscriber = subscriber.lock().unwrap();
151
152 subscriber.published = false;
154 subscriber.notify_ready = vec![];
155
156 to_notify.extend(subscriber.notify_waiting.drain(..));
158 }
159
160 to_notify
162 } else {
163 vec![]
165 }
166 };
167
168 to_notify.into_iter().for_each(|notify| notify.wake());
170 }
171}