1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
use crate::{EventIter, EventStream, Sender};

use std::{
    any::{Any, TypeId},
    collections::{BTreeSet, HashMap},
    marker::PhantomData,
};

type SenderList = Vec<(Id, Box<dyn Any>)>;

/// The id of the mapped Sender
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct Id(usize);

/// An event map which allows sending messages to a set of channels
#[derive(Default)]
pub struct EventMap {
    inner: HashMap<TypeId, SenderList>,
    id: usize,
}

impl EventMap {
    /// Create a new EventMap
    pub fn new() -> Self {
        Self::default()
    }

    /// Register this type with the EventMap, returning a clonable Receiver end
    pub fn register_stream<T: Clone + 'static>(&mut self) -> EventStream<T> {
        let (tx, rx) = crate::channel::unbounded();
        self.inner
            .entry(TypeId::of::<T>())
            .or_default()
            .push((Id(self.id), Box::new(tx)));
        self.id += 1;
        EventStream { inner: rx }
    }

    /// Register this type with the EventMap, returning a clonable Receiver end
    pub fn register_iter<T: Clone + 'static>(&mut self) -> EventIter<T> {
        let (tx, rx) = crate::channel::unbounded();
        self.inner
            .entry(TypeId::of::<T>())
            .or_default()
            .push((Id(self.id), Box::new(tx)));
        self.id += 1;
        EventIter { inner: rx }
    }

    /// Send this message to anything listening for it
    ///
    /// This will automatically clean up any stale senders after it fails to send
    pub fn send<T: Clone + 'static>(&mut self, msg: T) -> bool {
        if self.active::<T>() == 0 {
            return false;
        }

        let handlers = match self.get_senders::<T>() {
            Some(handlers) => handlers,
            None => return false,
        };

        let mut bad = BTreeSet::new();
        for (id, handler) in handlers {
            if let Err(..) = handler.send(msg.clone()) {
                // remove this id from the map
                bad.insert(id);
            }
        }
        self.remove::<T>(bad);

        true
    }

    /// Get the number of potential listeners for this message
    pub fn active<T: 'static>(&self) -> usize {
        self.inner
            .get(&TypeId::of::<T>())
            .map(Vec::len)
            .unwrap_or_default()
    }

    /// Determine whether there are any listeners for this message
    pub fn is_empty<T: 'static>(&self) -> bool {
        self.active::<T>() == 0
    }

    /// Get an iterator of all senders for this type
    ///
    /// The iterator will be over the `(Id, Sender<T>)`
    ///
    /// This returns None if no active senders are available
    pub fn get_senders<T: 'static>(&self) -> Option<Senders<'_, T>> {
        self.inner.get(&TypeId::of::<T>()).map(|list| Senders {
            inner: list.iter(),
            marker: PhantomData,
        })
    }

    /// Reset this EventMap, causing all pending Receivers to end.
    ///
    /// After doing this, you'll need to re-register all subscriptions
    pub fn reset(&mut self) {
        std::mem::take(&mut self.inner);
        self.id = 0;
    }

    pub(crate) fn remove<T: 'static>(&mut self, mut values: BTreeSet<Id>) {
        if values.is_empty() {
            // quick path because remove is called every dispatch
            return;
        }

        if let Some(inner) = self.inner.get_mut(&TypeId::of::<T>()) {
            // inverted so we remove them
            inner.retain(|(id, _)| !values.remove(&id))
        }
    }
}

/// An iterator over Senders for this message
///
/// This produces the `Id` and the `Sender` for that type
pub struct Senders<'a, T: 'static> {
    inner: std::slice::Iter<'a, (Id, Box<dyn Any>)>,
    marker: PhantomData<T>,
}

impl<'a, T: 'static> Iterator for Senders<'a, T> {
    type Item = (Id, Sender<T>);
    fn next(&mut self) -> Option<Self::Item> {
        self.inner.next().and_then(|(id, d)| {
            // TODO this should assert that it still exists
            let sender = d.downcast_ref::<Sender<T>>().cloned()?;
            Some((*id, sender))
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn event_map_async() {
        futures_lite::future::block_on(async move {
            use futures_lite::StreamExt as _;

            #[derive(Clone, Debug, PartialEq)]
            struct Message {
                data: String,
            }

            let mut map = EventMap::new();

            // subscriptions are initially empty
            assert_eq!(map.is_empty::<i32>(), true);
            assert_eq!(map.is_empty::<String>(), true);
            assert_eq!(map.is_empty::<Message>(), true);

            // subscribe two i32 twice
            assert_eq!(map.active::<i32>(), 0);
            let mut i1 = map.register_stream::<i32>();
            let mut i2 = map.register_stream::<i32>();
            // we should have 2 subscriptions active
            assert_eq!(map.active::<i32>(), 2);

            // subscribe to our custom type twice
            assert_eq!(map.active::<Message>(), 0);
            let mut m1 = map.register_stream::<Message>();
            let mut m2 = map.register_stream::<Message>();
            assert_eq!(map.active::<Message>(), 2);

            // send an i32
            assert_eq!(map.send(42_i32), true);

            // send our message
            let msg = Message {
                data: String::from("hello world"),
            };
            assert_eq!(map.send(msg.clone()), true);

            // will block until we get our number
            assert_eq!(i1.next().await.unwrap(), 42);
            // and the other one will also get it
            assert_eq!(i2.next().await.unwrap(), 42);

            // will block until we get our message
            assert_eq!(m1.next().await.unwrap(), msg);
            // and the other one will also get it
            assert_eq!(m2.next().await.unwrap(), msg);

            // no one is listening for () so this'll return false
            assert_eq!(map.send(()), false);

            // drop a subscription
            drop(i1);
            // and send a new value
            assert_eq!(map.send(43_i32), true);
            // only i2 will get it
            assert_eq!(i2.next().await.unwrap(), 43);
            // and it cleaned up the dropped subscription
            assert_eq!(map.active::<i32>(), 1);
        });
    }

    #[test]
    fn event_map() {
        #[derive(Clone, Debug, PartialEq)]
        struct Message {
            data: String,
        }

        let mut map = EventMap::new();

        // subscriptions are initially empty
        assert_eq!(map.is_empty::<i32>(), true);
        assert_eq!(map.is_empty::<String>(), true);
        assert_eq!(map.is_empty::<Message>(), true);

        // subscribe two i32 twice
        assert_eq!(map.active::<i32>(), 0);
        let mut i1 = map.register_iter::<i32>();
        let mut i2 = map.register_iter::<i32>();
        // we should have 2 subscriptions active
        assert_eq!(map.active::<i32>(), 2);

        // subscribe to our custom type twice
        assert_eq!(map.active::<Message>(), 0);
        let mut m1 = map.register_iter::<Message>();
        let mut m2 = map.register_iter::<Message>();
        assert_eq!(map.active::<Message>(), 2);

        // send an i32
        assert_eq!(map.send(42_i32), true);

        // send our message
        let msg = Message {
            data: String::from("hello world"),
        };
        assert_eq!(map.send(msg.clone()), true);

        // will block until we get our number
        assert_eq!(i1.next().unwrap(), 42);
        // and the other one will also get it
        assert_eq!(i2.next().unwrap(), 42);

        // will block until we get our message
        assert_eq!(m1.next().unwrap(), msg);
        // and the other one will also get it
        assert_eq!(m2.next().unwrap(), msg);

        // no one is listening for () so this'll return false
        assert_eq!(map.send(()), false);

        // drop a subscription
        drop(i1);
        // and send a new value
        assert_eq!(map.send(43_i32), true);
        // only i2 will get it
        assert_eq!(i2.next().unwrap(), 43);
        // and it cleaned up the dropped subscription
        assert_eq!(map.active::<i32>(), 1);
    }
}