flo_stream/
single_publisher.rs

1use super::subscriber::*;
2use super::pubsub_core::*;
3use super::message_publisher::*;
4
5use futures::future::{BoxFuture};
6
7use std::sync::*;
8use std::collections::{HashMap, VecDeque};
9
10///
11/// A single publisher is a publisher that sends each message to only a single subscriber
12/// rather than all of them
13/// 
14/// This is useful for scheduling messages on the first available worker.
15/// 
16pub struct SinglePublisher<Message> {
17    /// The shared core of this publisher
18    core: Arc<Mutex<PubCore<Message>>>
19}
20
21impl<Message> SinglePublisher<Message> {
22    ///
23    /// Creates a new single publisher, which will buffer the specified number of messages
24    /// 
25    pub fn new(buffer_size: usize) -> SinglePublisher<Message> {
26        // Create the core
27        let core = PubCore {
28            next_subscriber_id: 0,
29            publisher_count:    1,
30            subscribers:        HashMap::new(),
31            notify_closed:      HashMap::new(),
32            waiting:            vec![],
33            max_queue_size:     buffer_size
34        };
35
36        // Build the publisher itself
37        SinglePublisher {
38            core:   Arc::new(Mutex::new(core))
39        }
40    }
41
42    ///
43    /// Counts the number of subscribers in this publisher
44    /// 
45    pub fn count_subscribers(&self) -> usize {
46        self.core.lock().unwrap().subscribers.len()
47    }
48
49    ///
50    /// Creates a duplicate publisher that can be used to publish to the same streams as this object
51    /// 
52    pub fn republish(&self) -> Self {
53        self.core.lock().unwrap().publisher_count += 1;
54
55        SinglePublisher {
56            core:   Arc::clone(&self.core)
57        }
58    }
59}
60
61impl<Message: 'static+Send> MessagePublisher for SinglePublisher<Message> {
62    type Message = Message;
63
64    ///
65    /// Subscribes to this publisher
66    /// 
67    /// Subscribers only receive messages sent to the publisher after they are created.
68    /// 
69    fn subscribe(&mut self) -> Subscriber<Message> {
70        // Assign a subscriber ID
71        let subscriber_id = {
72            let mut core    = self.core.lock().unwrap();
73            let id          = core.next_subscriber_id;
74            core.next_subscriber_id += 1;
75
76            id
77        };
78
79        // Create the subscriber core
80        let sub_core = SubCore {
81            id:                 subscriber_id,
82            published:          true,
83            waiting:            VecDeque::new(),
84            reserved:           0,
85            notify_waiting:     vec![],
86            notify_ready:       vec![],
87            notify_complete:    vec![]
88        };
89
90        // The new subscriber needs a reference to the sub_core and the pub_core
91        let sub_core = Arc::new(Mutex::new(sub_core));
92        let pub_core = Arc::downgrade(&self.core);
93
94        // Register the subscriber with the core, so it will start receiving messages
95        {
96            let mut core = self.core.lock().unwrap();
97            core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
98        }
99
100        // Create the subscriber
101        Subscriber::new(pub_core, sub_core)
102    }
103
104    ///
105    /// Reserves a space for a message with the subscribers, returning when it's ready
106    ///
107    fn when_ready(&mut self) -> BoxFuture<'static, MessageSender<Message>> {
108        let when_ready  = PubCore::send_single_subscriber(&self.core);
109
110        Box::pin(when_ready)
111    }
112
113    ///
114    /// Waits until all subscribers have consumed all pending messages
115    ///
116    fn when_empty(&mut self) -> BoxFuture<'static, ()> {
117        let when_empty  = PubCore::when_empty(&self.core);
118
119        Box::pin(when_empty)
120    }
121
122    ///
123    /// Returns true if this publisher is closed (will not publish any further messages to its subscribers)
124    ///
125    fn is_closed(&self) -> bool { false }
126
127    ///
128    /// Future that returns when this publisher is closed
129    ///
130    fn when_closed(&self) -> BoxFuture<'static, ()> {
131        Box::pin(CoreClosedFuture::new(Arc::clone(&self.core)))
132    }
133}
134
135impl<Message> Drop for SinglePublisher<Message> {
136    fn drop(&mut self) {
137        let to_notify = {
138            // Lock the core
139            let mut pub_core = self.core.lock().unwrap();
140
141            // Check that this is the last publisher on this core
142            pub_core.publisher_count -= 1;
143            if pub_core.publisher_count == 0 {
144                // Mark all the subscribers as unpublished and notify them so that they close
145                let mut to_notify = pub_core.notify_closed.drain()
146                    .map(|(_id, waker)| waker)
147                    .collect::<Vec<_>>();
148
149                for subscriber in pub_core.subscribers.values() {
150                    let mut subscriber = subscriber.lock().unwrap();
151
152                    // Unpublish the subscriber (so that it hits the end of the stream)
153                    subscriber.published    = false;
154                    subscriber.notify_ready = vec![];
155
156                    // Add to the things to notify once the lock is released
157                    to_notify.extend(subscriber.notify_waiting.drain(..));
158                }
159
160                // Return the notifications outside of the lock
161                to_notify
162            } else {
163                // This is not the last core
164                vec![]
165            }
166        };
167
168        // Notify any subscribers that are waiting that we're unpublished
169        to_notify.into_iter().for_each(|notify| notify.wake());
170    }
171}