1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::{DeleteSet, StateVector, Transaction};
use rand::RngCore;
use std::collections::HashMap;
use std::ptr::NonNull;
#[repr(transparent)]
pub(crate) struct EventHandler<T>(Box<Subscriptions<T>>);
pub type SubscriptionId = u32;
type Subscriptions<T> = HashMap<SubscriptionId, Box<dyn Fn(&Transaction, &T) -> ()>>;
impl<T> EventHandler<T> {
pub fn new() -> Self {
EventHandler(Box::new(Subscriptions::new()))
}
pub fn subscribe<F>(&mut self, f: F) -> Subscription<T>
where
F: Fn(&Transaction, &T) -> () + 'static,
{
let mut rng = rand::thread_rng();
let id = rng.next_u32();
self.0.insert(id, Box::new(f));
let subscriptions = NonNull::from(self.0.as_mut());
Subscription { id, subscriptions }
}
pub fn unsubscribe(&mut self, subscription_id: u32) {
self.0.remove(&subscription_id);
}
pub fn publish(&self, txn: &Transaction, arg: &T) {
for f in self.0.values() {
f(txn, arg);
}
}
pub fn has_subscribers(&self) -> bool {
!self.0.is_empty()
}
fn subscription_count(&self) -> usize {
self.0.len()
}
}
impl<T> Default for EventHandler<T> {
fn default() -> Self {
Self::new()
}
}
pub struct Subscription<T> {
id: SubscriptionId,
subscriptions: NonNull<Subscriptions<T>>,
}
impl<T> Into<SubscriptionId> for Subscription<T> {
fn into(self) -> SubscriptionId {
let id = self.id;
std::mem::forget(self);
id
}
}
impl<T> Drop for Subscription<T> {
fn drop(&mut self) {
let subs = unsafe { self.subscriptions.as_mut() };
subs.remove(&self.id);
}
}
pub struct UpdateEvent {
pub update: Vec<u8>,
}
impl UpdateEvent {
pub(crate) fn new(update: Vec<u8>) -> Self {
UpdateEvent { update }
}
}
pub struct AfterTransactionEvent {
pub before_state: StateVector,
pub after_state: StateVector,
pub delete_set: DeleteSet,
}
#[cfg(test)]
mod test {
use crate::event::EventHandler;
use crate::Doc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[test]
fn subscription() {
let doc = Doc::new();
let txn = doc.transact();
let mut eh: EventHandler<u32> = EventHandler::new();
let s1_state = Arc::new(AtomicU32::new(0));
let s2_state = Arc::new(AtomicU32::new(0));
{
let a = s1_state.clone();
let b = s2_state.clone();
let _s1 = eh.subscribe(move |_, value| a.store(*value, Ordering::Release));
let _s2 = eh.subscribe(move |_, value| b.store(*value * 2, Ordering::Release));
assert_eq!(eh.subscription_count(), 2);
eh.publish(&txn, &1);
assert_eq!(s1_state.load(Ordering::Acquire), 1);
assert_eq!(s2_state.load(Ordering::Acquire), 2);
eh.publish(&txn, &2);
assert_eq!(s1_state.load(Ordering::Acquire), 2);
assert_eq!(s2_state.load(Ordering::Acquire), 4);
}
assert_eq!(eh.subscription_count(), 0);
eh.publish(&txn, &3);
assert_eq!(s1_state.load(Ordering::Acquire), 2);
assert_eq!(s2_state.load(Ordering::Acquire), 4);
}
}