1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use super::pubsub_core::*;

use futures::*;
use futures::task;

use std::sync::*;

///
/// Represents a subscriber stream from a publisher sink
/// 
pub struct Subscriber<Message> {
    /// The publisher core (shared between all subscribers)
    /// 
    /// Note that when locking the pub_core must always be locked first (if it needs to be locked)
    pub_core: Weak<Mutex<PubCore<Message>>>,

    /// The subscriber core (used only by this subscriber)
    /// 
    /// Note that when locking the pub_core must always be locked first (if it needs to be locked)
    sub_core: Arc<Mutex<SubCore<Message>>>
}

impl<Message> Subscriber<Message> {
    ///
    /// Creates a new subscriber
    /// 
    pub (crate) fn new(pub_core: Weak<Mutex<PubCore<Message>>>, sub_core: Arc<Mutex<SubCore<Message>>>) -> Subscriber<Message> {
        Subscriber {
            pub_core,
            sub_core
        }
    }
}

impl<Message: Clone> Subscriber<Message> {
    ///
    /// Resubscribes to the same publisher as this stream.
    /// 
    /// The new subscriber will receive any future messages that are also  destined for this stream, but will not
    /// receive any past messages.
    /// 
    pub fn resubscribe(&self) -> Self {
        let pub_core = self.pub_core.upgrade();

        if let Some(pub_core) = pub_core {
            let new_sub_core = {
                // Lock the cores
                let mut pub_core    = pub_core.lock().unwrap();
                let sub_core        = self.sub_core.lock().unwrap();

                // Assign an ID
                let new_id = pub_core.next_subscriber_id;
                pub_core.next_subscriber_id += 1;

                // Generate a new core for the clone
                let new_sub_core = SubCore {
                    id:                 new_id,
                    published:          true,
                    waiting:            sub_core.waiting.clone(),
                    notify_waiting:     vec![],
                    notify_ready:       vec![],
                    notify_complete:    vec![]
                };
                let new_sub_core = Arc::new(Mutex::new(new_sub_core));

                // Store in the publisher core
                pub_core.subscribers.insert(new_id, Arc::clone(&new_sub_core));

                new_sub_core
            };

            // Create the new subscriber
            Subscriber {
                pub_core: Arc::downgrade(&pub_core),
                sub_core: new_sub_core
            }
        } else {
            // Publisher has gone away
            let sub_core = self.sub_core.lock().unwrap();

            // Create the new core (no publisher)
            let new_sub_core = SubCore {
                id:                 0,
                published:          false,
                waiting:            sub_core.waiting.clone(),
                notify_waiting:     vec![],
                notify_ready:       vec![],
                notify_complete:    vec![]
            };

            // Generate a new subscriber with this core
            Subscriber {
                pub_core: Weak::new(),
                sub_core: Arc::new(Mutex::new(new_sub_core))
            }
        }
    }
}

impl<Message> Drop for Subscriber<Message> {
    fn drop(&mut self) {
        let (notify_ready, notify_complete) = {
            // Lock the publisher and subscriber cores (note that the publisher core must always be locked first)
            let pub_core = self.pub_core.upgrade();

            if let Some(pub_core) = pub_core {
                // Lock the cores
                let mut pub_core = pub_core.lock().unwrap();
                let mut sub_core = self.sub_core.lock().unwrap();

                // Remove this subscriber from the publisher core
                pub_core.subscribers.remove(&sub_core.id);

                // Need to notify the core if it's waiting on this subscriber (might now be unblocked)
                let notify_ready    = sub_core.notify_ready.drain(..).collect::<Vec<_>>();
                let notify_complete = sub_core.notify_complete.drain(..).collect::<Vec<_>>(); 
                (notify_ready, notify_complete)
            } else {
                // Need to notify the core if it's waiting on this subscriber (might now be unblocked)
                let mut sub_core = self.sub_core.lock().unwrap();
                let notify_ready    = sub_core.notify_ready.drain(..).collect::<Vec<_>>();
                let notify_complete = sub_core.notify_complete.drain(..).collect::<Vec<_>>(); 
                (notify_ready, notify_complete)
            }
        };

        // After releasing the locks, notify the publisher if it's waiting on this subscriber
        notify_ready.into_iter().for_each(|notify| notify.notify());
        notify_complete.into_iter().for_each(|notify| notify.notify());
    }
}

impl<Message> Stream for Subscriber<Message> {
    type Item   = Message;
    type Error  = ();

    fn poll(&mut self) -> Poll<Option<Message>, ()> {
        let (result, notify_ready, notify_complete) = {
            // Try to read a message from the waiting list
            let mut sub_core    = self.sub_core.lock().unwrap();
            let next_message    = sub_core.waiting.pop_front();

            if let Some(next_message) = next_message {
                // If the core is empty and we have a 'complete' notification, then send that
                let notify_complete = if sub_core.waiting.len() == 0 {
                    sub_core.notify_complete.drain(..).collect::<Vec<_>>()
                } else {
                    vec![]
                };

                // If something is waiting for this subscriber to become ready, then notify it as well
                let notify_ready = sub_core.notify_ready.drain(..).collect::<Vec<_>>();

                // Return the next message if it's available
                (Ok(Async::Ready(Some(next_message))), notify_ready, notify_complete)
            } else if !sub_core.published {
                // Stream has finished if the publisher core is no longer available
                (Ok(Async::Ready(None)), vec![], vec![])
            } else {
                // If the publisher is still alive and there are no messages available, store notification and carry on
                sub_core.notify_waiting.push(task::current());

                // If anything is waiting for this subscriber to become ready, make sure it's notified
                let notify_ready    = sub_core.notify_ready.drain(..).collect::<Vec<_>>();
                let notify_complete = sub_core.notify_complete.drain(..).collect::<Vec<_>>();

                (Ok(Async::NotReady), notify_ready, notify_complete)
            }
        };

        // If there's something to notify as a result of this request, do so (note that we do this after releasing the core lock)
        notify_ready.into_iter().for_each(|ready| ready.notify());
        notify_complete.into_iter().for_each(|complete| complete.notify());

        // Return the result
        result
    }
}

///
/// It's possible to clone a subscriber stream. The clone will receive any waiting messages
/// and any future messages for the original subscriber
/// 
impl<Message: Clone> Clone for Subscriber<Message> {
    fn clone(&self) -> Subscriber<Message> {
        // TODO: 'clone' is perhaps not right as we don't reproduce the stream in its entirety. Remove this in future versions.
        // 'resubscribe' is a better description of what is happening here.
        self.resubscribe()
    }
}