1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::{DeleteSet, StateVector, Transaction};
use rand::RngCore;
use std::collections::HashMap;
use std::ptr::NonNull;

#[repr(transparent)]
pub(crate) struct EventHandler<T>(Box<Subscriptions<T>>);

pub type SubscriptionId = u32;

type Subscriptions<T> = HashMap<SubscriptionId, Box<dyn Fn(&Transaction, &T) -> ()>>;

impl<T> EventHandler<T> {
    pub fn new() -> Self {
        EventHandler(Box::new(Subscriptions::new()))
    }

    pub fn subscribe<F>(&mut self, f: F) -> Subscription<T>
    where
        F: Fn(&Transaction, &T) -> () + 'static,
    {
        let mut rng = rand::thread_rng();
        let id = rng.next_u32();
        self.0.insert(id, Box::new(f));
        let subscriptions = NonNull::from(self.0.as_mut());
        Subscription { id, subscriptions }
    }

    pub fn unsubscribe(&mut self, subscription_id: u32) {
        self.0.remove(&subscription_id);
    }

    pub fn publish(&self, txn: &Transaction, arg: &T) {
        for f in self.0.values() {
            f(txn, arg);
        }
    }

    pub fn has_subscribers(&self) -> bool {
        !self.0.is_empty()
    }

    fn subscription_count(&self) -> usize {
        self.0.len()
    }
}

impl<T> Default for EventHandler<T> {
    fn default() -> Self {
        Self::new()
    }
}

/// A subscription handle to a custom user-defined callback for an event handler. When dropped,
/// it will unsubscribe corresponding callback.
pub struct Subscription<T> {
    id: SubscriptionId,
    subscriptions: NonNull<Subscriptions<T>>,
}

impl<T> Into<SubscriptionId> for Subscription<T> {
    fn into(self) -> SubscriptionId {
        let id = self.id;
        std::mem::forget(self);
        id
    }
}

impl<T> Drop for Subscription<T> {
    fn drop(&mut self) {
        let subs = unsafe { self.subscriptions.as_mut() };
        subs.remove(&self.id);
    }
}

/// An update event passed to a callback registered in the event handler. Contains data about the
/// state of an update.
pub struct UpdateEvent {
    /// An update that's about to be applied. Update contains information about all inserted blocks,
    /// which have been send from a remote peer.
    pub update: Vec<u8>,
}

impl UpdateEvent {
    pub(crate) fn new(update: Vec<u8>) -> Self {
        UpdateEvent { update }
    }
}

/// Holds transaction update information from a commit after state vectors have been compressed.
pub struct AfterTransactionEvent {
    pub before_state: StateVector,
    pub after_state: StateVector,
    pub delete_set: DeleteSet,
}

#[cfg(test)]
mod test {
    use crate::event::EventHandler;
    use crate::Doc;
    use std::sync::atomic::{AtomicU32, Ordering};
    use std::sync::Arc;

    #[test]
    fn subscription() {
        let doc = Doc::new();
        let txn = doc.transact(); // just for sake of parameter passing

        let mut eh: EventHandler<u32> = EventHandler::new();
        let s1_state = Arc::new(AtomicU32::new(0));
        let s2_state = Arc::new(AtomicU32::new(0));

        {
            let a = s1_state.clone();
            let b = s2_state.clone();

            let _s1 = eh.subscribe(move |_, value| a.store(*value, Ordering::Release));
            let _s2 = eh.subscribe(move |_, value| b.store(*value * 2, Ordering::Release));
            assert_eq!(eh.subscription_count(), 2);

            eh.publish(&txn, &1);
            assert_eq!(s1_state.load(Ordering::Acquire), 1);
            assert_eq!(s2_state.load(Ordering::Acquire), 2);

            eh.publish(&txn, &2);
            assert_eq!(s1_state.load(Ordering::Acquire), 2);
            assert_eq!(s2_state.load(Ordering::Acquire), 4);
        }

        // both subscriptions left the scope, they should be dropped
        assert_eq!(eh.subscription_count(), 0);

        // subscriptions were dropped, we don't expect updates to be propagated
        eh.publish(&txn, &3);
        assert_eq!(s1_state.load(Ordering::Acquire), 2);
        assert_eq!(s2_state.load(Ordering::Acquire), 4);
    }
}