1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
use super::pubsub_core::*;
use super::subscriber_handle::*;
use futures::*;
use futures::task;
use futures::task::{Poll};
use std::sync::*;
use std::pin::{Pin};
use std::collections::{HashMap};
///
/// Represents a subscriber stream from a publisher sink
///
pub struct Subscriber<Message> {
/// The publisher core (shared between all subscribers)
///
/// Note that when locking the pub_core must always be locked first (if it needs to be locked)
pub_core: Weak<Mutex<PubCore<Message>>>,
/// The subscriber core (used only by this subscriber)
///
/// Note that when locking the pub_core must always be locked first (if it needs to be locked)
sub_core: Arc<Mutex<SubCore<Message>>>
}
impl<Message> Subscriber<Message> {
///
/// Creates a new subscriber
///
pub (crate) fn new(pub_core: Weak<Mutex<PubCore<Message>>>, sub_core: Arc<Mutex<SubCore<Message>>>) -> Subscriber<Message> {
Subscriber {
pub_core,
sub_core
}
}
}
impl<Message: Clone> Subscriber<Message> {
///
/// Resubscribes to the same publisher as this stream.
///
/// The new subscriber will receive any future messages that are also destined for this stream, but will not
/// receive any past messages.
///
pub fn resubscribe(&self) -> Self {
let pub_core = self.pub_core.upgrade();
if let Some(pub_core) = pub_core {
let new_sub_core = {
// Lock the cores
let mut pub_core = pub_core.lock().unwrap();
let sub_core = self.sub_core.lock().unwrap();
// Assign an ID
let new_id = SubscriberHandle::new();
// Generate a new core for the clone
let new_sub_core = SubCore {
id: new_id,
published: true,
waiting: sub_core.waiting.clone(),
reserved: 0,
notify_waiting: HashMap::new(),
notify_ready: HashMap::new(),
};
let new_sub_core = Arc::new(Mutex::new(new_sub_core));
// Store in the publisher core
pub_core.subscribers.insert(new_id, Arc::clone(&new_sub_core));
new_sub_core
};
// Create the new subscriber
Subscriber {
pub_core: Arc::downgrade(&pub_core),
sub_core: new_sub_core
}
} else {
// Publisher has gone away
let sub_core = self.sub_core.lock().unwrap();
// Create the new core (no publisher)
let new_sub_core = SubCore {
id: SubscriberHandle::new(),
published: false,
waiting: sub_core.waiting.clone(),
reserved: 0,
notify_waiting: HashMap::new(),
notify_ready: HashMap::new(),
};
// Generate a new subscriber with this core
Subscriber {
pub_core: Weak::new(),
sub_core: Arc::new(Mutex::new(new_sub_core))
}
}
}
}
impl<Message> Drop for Subscriber<Message> {
fn drop(&mut self) {
let notify_ready = {
// Lock the publisher and subscriber cores (note that the publisher core must always be locked first)
let pub_core = self.pub_core.upgrade();
if let Some(pub_core) = pub_core {
// Lock the cores
let mut pub_core = pub_core.lock().unwrap();
let mut sub_core = self.sub_core.lock().unwrap();
// Remove this subscriber from the publisher core
pub_core.subscribers.remove(&sub_core.id);
// Need to notify the core if it's waiting on this subscriber (might now be unblocked)
let notify_ready = sub_core.notify_ready.drain().map(|(_, waker)| waker).collect::<Vec<_>>();
notify_ready
} else {
// Need to notify the core if it's waiting on this subscriber (might now be unblocked)
let mut sub_core = self.sub_core.lock().unwrap();
let notify_ready = sub_core.notify_ready.drain().map(|(_, waker)| waker).collect::<Vec<_>>();
notify_ready
}
};
// After releasing the locks, notify the publisher if it's waiting on this subscriber
notify_ready.into_iter().for_each(|notify| notify.wake());
}
}
impl<Message> Stream for Subscriber<Message> {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, context: &mut task::Context) -> Poll<Option<Message>> {
let (result, notify_ready) = {
// Try to read a message from the waiting list
let mut sub_core = self.sub_core.lock().unwrap();
let handle = sub_core.id;
let next_message = sub_core.waiting.pop_front();
if let Some(next_message) = next_message {
// If something is waiting for this subscriber to become ready, then notify it as well
let notify_ready = sub_core.notify_ready.drain().map(|(_, waker)| waker).collect::<Vec<_>>();
// Return the next message if it's available
(Poll::Ready(Some(next_message)), notify_ready)
} else if !sub_core.published {
// Stream has finished if the publisher core is no longer available
(Poll::Ready(None), vec![])
} else {
// If the publisher is still alive and there are no messages available, store notification and carry on
sub_core.notify_waiting.insert(handle, context.waker().clone());
// If anything is waiting for this subscriber to become ready, make sure it's notified
let notify_ready = sub_core.notify_ready.drain().map(|(_, waker)| waker).collect::<Vec<_>>();
(Poll::Pending, notify_ready)
}
};
// If there's something to notify as a result of this request, do so (note that we do this after releasing the core lock)
notify_ready.into_iter().for_each(|ready| ready.wake());
// Return the result
result
}
}
///
/// It's possible to clone a subscriber stream. The clone will receive any waiting messages
/// and any future messages for the original subscriber
///
impl<Message: Clone> Clone for Subscriber<Message> {
fn clone(&self) -> Subscriber<Message> {
// TODO: 'clone' is perhaps not right as we don't reproduce the stream in its entirety. Remove this in future versions.
// 'resubscribe' is a better description of what is happening here.
self.resubscribe()
}
}