1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
use super::*;
use crate::Ptr;
use std::{
collections::HashSet,
hash::{Hash, Hasher},
mem,
ops::DerefMut,
ptr,
sync::{Arc, Mutex},
task::Waker,
};
#[cfg(feature = "trace")]
mod busy {
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use tokio::sync::Notify;
pub static BUSY_COUNT: AtomicUsize = AtomicUsize::new(0);
pub static BUSY_WAKER: Notify = Notify::const_new();
pub fn set_busy() {
BUSY_COUNT.fetch_add(1, SeqCst);
}
pub fn clear_busy() {
let prev = BUSY_COUNT.fetch_sub(1, SeqCst);
debug_assert!(prev != 0);
if prev == 1 {
BUSY_WAKER.notify_waiters();
}
}
/// Ready once all eventuals readers are waiting on a new value. _Generally_
/// speaking, it is possible to ensure that a change has propagated through
/// an eventuals pipeline using this method. However, there is no guarantee
/// that this will complete in a timely fashion if ever. A sufficiently
/// layered pipeline that is always moving values through may never be idle.
/// So, this is only useful in isolated tests.
pub async fn idle() {
loop {
let notified = BUSY_WAKER.notified();
if BUSY_COUNT.load(SeqCst) == 0 {
return;
}
notified.await
}
}
}
#[cfg(feature = "trace")]
pub use busy::idle;
struct Busy<T>(T);
impl<T> Busy<T> {
fn new(value: T) -> Self {
#[cfg(feature = "trace")]
busy::set_busy();
Self(value)
}
fn unbusy(mut self) -> T {
unsafe {
let inner = ptr::read(&mut self.0);
mem::forget(self);
#[cfg(feature = "trace")]
busy::clear_busy();
inner
}
}
}
impl<T> Deref for Busy<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> Drop for Busy<T> {
fn drop(&mut self) {
#[cfg(feature = "trace")]
busy::clear_busy();
}
}
enum ChangeVal<T> {
None(Busy<()>),
Value(Busy<T>),
Finalized(Busy<Option<T>>),
Waker(Waker),
}
pub enum ChangeValNoWake<T> {
None,
Value(T),
Finalized(Option<T>),
}
pub struct Change<T> {
inner: Ptr<Mutex<ChangeVal<T>>>,
}
pub struct ChangeReader<T> {
pub change: Change<T>,
pub unsubscribe_from: Arc<SharedState<T>>,
}
impl<T> Drop for ChangeReader<T> {
fn drop(&mut self) {
let mut lock = self.unsubscribe_from.subscribers.lock().unwrap();
let mut updated: HashSet<_> = lock.deref().deref().clone();
if updated.remove(&self.change) {
*lock = Arc::new(updated);
}
}
}
impl<T> Change<T>
where
T: Value,
{
pub fn new() -> Self {
Self {
inner: Ptr::new(Mutex::new(ChangeVal::None(Busy::new(())))),
}
}
pub fn poll(
&self,
cmp: &Option<Result<T, Closed>>,
cx: &mut Context,
) -> Option<Result<T, Closed>> {
let mut lock = self.inner.lock().unwrap();
// Move the value out pre-emptively to keep things sane for the borrow checker.
// Depending on the branch ahead we'll swap in different values.
let value = mem::replace(lock.deref_mut(), ChangeVal::None(Busy::new(())));
match value {
// If there is a new value and it is different than our previously
// observed value return it. Otherwise fall back to waking later.
ChangeVal::Value(value) => {
let value = value.unbusy();
let value = Some(Ok(value));
if cmp != &value {
return value;
}
}
// If the eventual is finalized from the writer end make sure that the final value
// (if any) is returned once as though it were a normal value. Then (possibly on
// a subsequent poll) return the Err.
ChangeVal::Finalized(value) => {
if let Some(value) = value.unbusy() {
let value = Some(Ok(value));
if cmp != &value {
*lock = ChangeVal::Finalized(Busy::new(None));
return value;
}
}
return Some(Err(Closed));
}
// There is no update. The waker may need to be re-scheduled.
ChangeVal::None(_) | ChangeVal::Waker(_) => {}
}
*lock = ChangeVal::Waker(cx.waker().clone());
None
}
pub fn set_value(&self, value: &Mutex<ChangeValNoWake<T>>) {
let prev = {
// To avoid race conditions BOTH locks MUST be held. This insures
// that if new values are pushed while subscribers are being
// notified there cannot be a time that a subscriber is notified
// with the old value. Instead, it might be notified with the new
// value twice. Notice that the former is apocalyptic (missed
// updates) and the later just drains some performance for an extra
// equality check on the receiving end.
let value = value.lock().unwrap();
let mut inner = self.inner.lock().unwrap();
// Move out of inner early for borrow checker.
let prev = mem::replace(inner.deref_mut(), ChangeVal::None(Busy::new(())));
match value.deref() {
ChangeValNoWake::None => {
// Prev must be None. The only time set_value is called when
// value is None is when the value has never before been set
// (therefore prev is not ChangeValue::Finalized or
// ChangeVal::Value) and the ChangeVal has no waker because
// we are now adding it to the subscriber list.
// If this assert fails, we would want `*inner = prev;`.
debug_assert!(matches!(prev, ChangeVal::None(_)));
// Since we know this is None, there is no need to check
// for the waker (below)
return;
}
// There is an update.
ChangeValNoWake::Value(value) => {
// The previous value must not have been finalized.
// It is not possible to move from a finalized state to
// then have updates.
debug_assert!(!matches!(prev, ChangeVal::Finalized(_)));
// Set the value.
*inner = ChangeVal::Value(Busy::new(value.clone()));
}
// If closing, this is more tricky because we want to preserve
// the last update (if any) so that the final value propagates
// all the way through.
ChangeValNoWake::Finalized(finalized) => {
// Verify that it's not copying the final value over again
// because in racey situations it may have been copied once
// then had the value consumed. It wouldn't be the end of the
// world to reset the finalized state, but would result in
// some unnecessary work.
if !matches!(prev, ChangeVal::Finalized(_)) {
*inner = ChangeVal::Finalized(Busy::new(finalized.clone()));
}
}
};
prev
// Drop locks before calling wake()
};
// Race conditions here are OK. The worst that can happen
// is that Tasks are woken up unnecessarily. They would
// just return Pending the second time.
if let ChangeVal::Waker(waker) = prev {
waker.wake();
}
}
}
impl<T> Clone for Change<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> Eq for Change<T> {}
impl<T> PartialEq for Change<T> {
fn eq(&self, other: &Self) -> bool {
self.inner.eq(&other.inner)
}
}
impl<T> Hash for Change<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner.hash(state)
}
}