1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use super::subscriber::*;
use super::pubsub_core::*;
use super::publisher_sink::*;

use futures::*;

use std::sync::*;
use std::collections::{HashMap, VecDeque};

///
/// A single publisher is a publisher that sends each message to only a single subscriber
/// rather than all of them
/// 
/// This is useful for scheduling messages on the first available worker.
/// 
pub struct SinglePublisher<Message> {
    /// The shared core of this publisher
    core: Arc<Mutex<PubCore<Message>>>
}

impl<Message> SinglePublisher<Message> {
    ///
    /// Creates a new single publisher, which will buffer the specified number of messages
    /// 
    pub fn new(buffer_size: usize) -> SinglePublisher<Message> {
        // Create the core
        let core = PubCore {
            next_subscriber_id: 0,
            publisher_count:    1,
            subscribers:        HashMap::new(),
            max_queue_size:     buffer_size
        };

        // Build the publisher itself
        SinglePublisher {
            core:   Arc::new(Mutex::new(core))
        }
    }

    ///
    /// Counts the number of subscribers in this publisher
    /// 
    pub fn count_subscribers(&self) -> usize {
        self.core.lock().unwrap().subscribers.len()
    }

    ///
    /// Creates a duplicate publisher that can be used to publish to the same streams as this object
    /// 
    pub fn republish(&self) -> Self {
        self.core.lock().unwrap().publisher_count += 1;

        SinglePublisher {
            core:   Arc::clone(&self.core)
        }
    }
}

impl<Message> PublisherSink<Message> for SinglePublisher<Message> {
    ///
    /// Subscribes to this publisher
    /// 
    /// Subscribers only receive messages sent to the publisher after they are created.
    /// 
    fn subscribe(&mut self) -> Subscriber<Message> {
        // Assign a subscriber ID
        let subscriber_id = {
            let mut core    = self.core.lock().unwrap();
            let id          = core.next_subscriber_id;
            core.next_subscriber_id += 1;

            id
        };

        // Create the subscriber core
        let sub_core = SubCore {
            id:                 subscriber_id,
            published:          true,
            waiting:            VecDeque::new(),
            notify_waiting:     vec![],
            notify_ready:       vec![],
            notify_complete:    vec![]
        };

        // The new subscriber needs a reference to the sub_core and the pub_core
        let sub_core = Arc::new(Mutex::new(sub_core));
        let pub_core = Arc::downgrade(&self.core);

        // Register the subscriber with the core, so it will start receiving messages
        {
            let mut core = self.core.lock().unwrap();
            core.subscribers.insert(subscriber_id, Arc::clone(&sub_core));
        }

        // Create the subscriber
        Subscriber::new(pub_core, sub_core)
    }
}

impl<Message> Drop for SinglePublisher<Message> {
    fn drop(&mut self) {
        let to_notify = {
            // Lock the core
            let mut pub_core = self.core.lock().unwrap();

            // Check that this is the last publisher on this core
            pub_core.publisher_count -= 1;
            if pub_core.publisher_count == 0 {
                // Mark all the subscribers as unpublished and notify them so that they close
                let mut to_notify = vec![];

                for subscriber in pub_core.subscribers.values() {
                    let mut subscriber = subscriber.lock().unwrap();

                    // Unpublish the subscriber (so that it hits the end of the stream)
                    subscriber.published    = false;
                    subscriber.notify_ready = vec![];

                    // Add to the things to notify once the lock is released
                    to_notify.extend(subscriber.notify_waiting.drain(..));
                }

                // Return the notifications outside of the lock
                to_notify
            } else {
                // This is not the last core
                vec![]
            }
        };

        // Notify any subscribers that are waiting that we're unpublished
        to_notify.into_iter().for_each(|notify| notify.notify());
    }
}

impl<Message> Sink for SinglePublisher<Message> {
    type SinkItem   = Message;
    type SinkError  = ();

    fn start_send(&mut self, item: Message) -> StartSend<Message, ()> {
        // Publish the message to the core
        let notify = { self.core.lock().unwrap().publish_single(item) };

        match notify {
            PublishSingleOutcome::Published(notify) => {
                // Notify all the subscribers that the item has been published
                notify.into_iter().for_each(|notify| notify.notify());

                // Message sent
                Ok(AsyncSink::Ready)
            },

            PublishSingleOutcome::NotPublished(item) => {
                // At least one subscriber has a full queue, so the message could not be sent
                Ok(AsyncSink::NotReady(item))
            }
        }
    }

    fn poll_complete(&mut self) -> Poll<(), ()> {
        if self.core.lock().unwrap().complete() {
            // All subscribers are ready to receive a message
            Ok(Async::Ready(()))
        } else {
            // At least one subscriber has a full buffer
            Ok(Async::NotReady)
        }
    }
}