1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
use super::subscriber::*;
use super::pubsub_core::*;
use super::publisher_sink::*;

use futures::*;
use futures::sink::Sink;

use std::sync::*;
use std::collections::{HashMap, VecDeque};

///
/// A publisher represents a sink that sends messages to zero or more subscribers
/// 
/// Call `subscribe()` to create subscribers. Any messages sent to this sink will be relayed to all connected
/// subscribers. If the publisher is dropped, any connected subscribers will relay all sent messages and then
/// indicate that they have finished.
/// 
pub struct Publisher<Message> {
    /// The shared core of this publisher
    core: Arc<Mutex<PubCore<Message>>>
}

impl<Message: Clone> Publisher<Message> {
    ///
    /// Creates a new publisher with a particular buffer size
    /// 
    pub fn new(buffer_size: usize) -> Publisher<Message> {
        // Create the core
        let core = PubCore {
            next_subscriber_id: 0,
            publisher_count:    1,
            subscribers:        HashMap::new(),
            max_queue_size:     buffer_size
        };

        // Build the publisher itself
        Publisher {
            core:   Arc::new(Mutex::new(core))
        }
    }

    ///
    /// Counts the number of subscribers in this publisher
    /// 
    pub fn count_subscribers(&self) -> usize {
        self.core.lock().unwrap().subscribers.len()
    }

    ///
    /// Creates a duplicate publisher that can be used to publish to the same streams as this object
    /// 
    pub fn republish(&self) -> Self {
        self.core.lock().unwrap().publisher_count += 1;

        Publisher {
            core:   Arc::clone(&self.core)
        }
    }
}

impl<Message: Clone> PublisherSink<Message> for Publisher<Message> {
    ///
    /// Subscribes to this publisher
    /// 
    /// Subscribers only receive messages sent to the publisher after they are created.
    /// 
    fn subscribe(&mut self) -> Subscriber<Message> {
        // Assign a subscriber ID
        let subscriber_id = {
            let mut core    = self.core.lock().unwrap();
            let id          = core.next_subscriber_id;
            core.next_subscriber_id += 1;

            id
        };

        // Create the subscriber core
        let sub_core = SubCore {
            id:                 subscriber_id,
            published:          true,
            waiting:            VecDeque::new(),
            notify_waiting:     vec![],
            notify_ready:       vec![],
            notify_complete:    vec![]
        };

        // The new subscriber needs a reference to the sub_core and the pub_core
        let sub_core = Arc::new(Mutex::new(sub_core));
        let pub_core = Arc::downgrade(&self.core);

        // Register the subscriber with the core, so it will start receiving messages
        {
            let mut core = self.core.lock().unwrap();
            core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
        }

        // Create the subscriber
        Subscriber::new(pub_core, sub_core)
    }
}

impl<Message> Drop for Publisher<Message> {
    fn drop(&mut self) {
        let to_notify = {
            // Lock the core
            let mut pub_core = self.core.lock().unwrap();

            // Check that this is the last publisher on this core
            pub_core.publisher_count -= 1;
            if pub_core.publisher_count == 0 {
                // Mark all the subscribers as unpublished and notify them so that they close
                let mut to_notify = vec![];

                for subscriber in pub_core.subscribers.values() {
                    let mut subscriber = subscriber.lock().unwrap();

                    // Unpublish the subscriber (so that it hits the end of the stream)
                    subscriber.published    = false;
                    subscriber.notify_ready = vec![];

                    // Add to the things to notify once the lock is released
                    to_notify.extend(subscriber.notify_waiting.drain(..));
                }

                // Return the notifications outside of the lock
                to_notify
            } else {
                // This is not the last core
                vec![]
            }
        };

        // Notify any subscribers that are waiting that we're unpublished
        to_notify.into_iter().for_each(|notify| notify.notify());
    }
}

impl<Message: Clone> Sink for Publisher<Message> {
    type SinkItem   = Message;
    type SinkError  = ();

    fn start_send(&mut self, item: Message) -> StartSend<Message, ()> {
        // Publish the message to the core
        let notify = { self.core.lock().unwrap().publish(&item) };

        if let Some(notify) = notify {
            // Notify all the subscribers that the item has been published
            notify.into_iter().for_each(|notify| notify.notify());

            // Message sent
            Ok(AsyncSink::Ready)
        } else {
            // At least one subscriber has a full queue, so the message could not be sent
            Ok(AsyncSink::NotReady(item))
        }
    }

    fn poll_complete(&mut self) -> Poll<(), ()> {
        if self.core.lock().unwrap().complete() {
            // All subscribers are ready to receive a message
            Ok(Async::Ready(()))
        } else {
            // At least one subscriber has a full buffer
            Ok(Async::NotReady)
        }
    }
}