modular_rs/core/
events.rs

1use futures::Sink;
2use futures_util::SinkExt;
3use parking_lot::Mutex;
4use std::marker::PhantomData;
5
6use crate::core::pattern::Pattern;
7use tokio::sync::mpsc;
8use tokio::sync::mpsc::UnboundedSender;
9
10type EventsHandler<T> = (Pattern, UnboundedSender<(String, T)>);
11
12pub struct EventsManager<T> {
13    handlers: Mutex<Vec<EventsHandler<T>>>,
14    _pd: PhantomData<T>,
15}
16
17impl<T> Default for EventsManager<T> {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl<T> EventsManager<T> {
24    pub fn new() -> Self {
25        Self {
26            handlers: Default::default(),
27            _pd: Default::default(),
28        }
29    }
30
31    pub fn publish(&self, dest: &str, data: T)
32    where
33        T: Clone + Send + Sync + 'static,
34    {
35        let mut guard = self.handlers.lock();
36
37        guard.retain(|(pattern, handler)| {
38            if pattern.matches(dest) {
39                let data = data.clone();
40                let dest = dest.to_owned();
41
42                handler.send((dest, data)).is_ok()
43            } else {
44                true
45            }
46        });
47    }
48
49    pub fn subscribe<L, E>(&self, pattern: Pattern, listener: L)
50    where
51        L: Sink<(String, T), Error = E> + Send + Sync + 'static,
52        T: Send + Sync + 'static,
53    {
54        let (tx, mut rx) = mpsc::unbounded_channel();
55        self.handlers.lock().push((pattern, tx));
56
57        tokio::spawn(async move {
58            let mut listener = Box::pin(listener);
59            while let Some((dest, data)) = rx.recv().await {
60                if listener.send((dest, data)).await.is_err() {
61                    break;
62                }
63            }
64        });
65    }
66}