1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use futures::channel::oneshot::Sender;
use std::{
collections::HashSet,
ops::Deref,
sync::{Arc, Mutex},
};
use super::{
change::{Change, ChangeReader, ChangeValNoWake},
*,
};
pub struct SharedState<T> {
// This takes a cue from the .NET event implementation
// which bets that:
// Typically the number of subscribers is relatively small
// Modifying the subscriber list is less frequent than posting events
// These taken together advocate for a snapshot-at-time style of concurrency
// which makes snapshotting very cheap but updates expensive.
//
// There is maybe a case for some kind of persistent immutable data structure
// here (finally, for once in my career...). `im` on crates.io is a candidate.
// (Lookout though, `im` uses MPL license which is incompatible with MIT)
// It would probably require a lot of subscribers before paying off - which
// itself requires heavy linear costs. So, I am skeptical still.
pub subscribers: Mutex<Arc<HashSet<Change<T>>>>,
pub last_write: Mutex<ChangeValNoWake<T>>,
writer_notify: Option<Sender<()>>,
}
impl<T> Drop for SharedState<T> {
fn drop(&mut self) {
if let Some(notify) = self.writer_notify.take() {
// Ignore the possible error because that means
// the other side is dropped. The point of notify
// is to drop.
let _ignore = notify.send(());
}
}
}
impl<T> SharedState<T>
where
T: Value,
{
pub fn new(writer_notify: Sender<()>) -> Self {
Self {
subscribers: Mutex::new(Arc::new(HashSet::new())),
last_write: Mutex::new(ChangeValNoWake::None),
writer_notify: Some(writer_notify),
}
}
pub fn notify_all(&self) {
let snapshot = {
let lock = self.subscribers.lock().unwrap();
lock.deref().clone()
};
for subscriber in snapshot.iter() {
self.notify_one(subscriber)
}
}
pub fn notify_one(&self, subscriber: &Change<T>) {
subscriber.set_value(&self.last_write);
}
pub fn subscribe(self: Arc<Self>) -> ChangeReader<T> {
let change: Change<T> = Change::new();
{
let mut lock = self.subscribers.lock().unwrap();
let mut updated: HashSet<_> = lock.deref().deref().clone();
updated.insert(change.clone());
*lock = Arc::new(updated);
}
// Must notify AFTER it's in the subscriber list to avoid missing updates.
self.notify_one(&change);
ChangeReader {
change,
unsubscribe_from: self,
}
}
}