1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use crate::event::*;
use crate::subscriber::*;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct Emitter<E: Event> {
    subscribers: Arc<RwLock<Vec<BoxedSubscriber<E>>>>,
}

#[allow(clippy::new_without_default, clippy::len_without_is_empty)]
impl<E: Event + 'static> Emitter<E> {
    /// Create a new event emitter.
    pub fn new() -> Self {
        Emitter {
            subscribers: Arc::new(RwLock::new(Vec::new())),
        }
    }

    /// Return a count of how many subscribers have been registered.
    pub async fn len(&self) -> usize {
        self.subscribers.read().await.len()
    }

    /// Register a subscriber to receive events.
    pub async fn subscribe<L: Subscriber<E> + 'static>(&self, subscriber: L) -> &Self {
        self.subscribers.write().await.push(Box::new(subscriber));
        self
    }

    /// Register a subscriber function to receive events.
    pub async fn on<L: SubscriberFunc<E> + 'static>(&self, callback: L) -> &Self {
        self.subscribe(CallbackSubscriber::new(callback, false))
            .await
    }

    /// Register a subscriber function that will unregister itself after the first
    /// event is received. This is useful for one-time event handlers.
    pub async fn once<L: SubscriberFunc<E> + 'static>(&self, callback: L) -> &Self {
        self.subscribe(CallbackSubscriber::new(callback, true))
            .await
    }

    /// Emit the provided event to all registered subscribers. Subscribers will be
    /// called in the order they were registered.
    ///
    /// If a subscriber returns [`EventState::Stop`], no further subscribers will be called.
    /// If a subscriber returns [`EventState::Return`], no further subscribers will be called
    /// and the provided value will be returned.
    /// If a subscriber returns [`EventState::Continue`], the next subscriber will be called.
    ///
    /// When complete, the provided event will be returned along with the value returned
    /// by the subscriber that returned [`EventState::Return`], or [`None`] if not occurred.
    pub async fn emit(&self, event: E) -> miette::Result<E::Data> {
        let mut remove_indices = HashSet::new();
        let mut subscribers = self.subscribers.write().await;

        let event = Arc::new(event);
        let data = Arc::new(RwLock::new(E::Data::default()));

        for (index, subscriber) in subscribers.iter_mut().enumerate() {
            let event = Arc::clone(&event);
            let data = Arc::clone(&data);

            if subscriber.is_once() {
                remove_indices.insert(index);
            }

            match subscriber.on_emit(event, data).await? {
                EventState::Continue => continue,
                EventState::Stop => break,
            };
        }

        // Remove only once subscribers that were called
        let mut i = 0;

        subscribers.retain(|_| {
            let remove = remove_indices.contains(&i);
            i += 1;
            !remove
        });

        Ok(Arc::into_inner(data).unwrap().into_inner())
    }
}