modular_rs/core/
events.rs1use futures::Sink;
2use futures_util::SinkExt;
3use parking_lot::Mutex;
4use std::marker::PhantomData;
5
6use crate::core::pattern::Pattern;
7use tokio::sync::mpsc;
8use tokio::sync::mpsc::UnboundedSender;
9
10type EventsHandler<T> = (Pattern, UnboundedSender<(String, T)>);
11
12pub struct EventsManager<T> {
13 handlers: Mutex<Vec<EventsHandler<T>>>,
14 _pd: PhantomData<T>,
15}
16
17impl<T> Default for EventsManager<T> {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl<T> EventsManager<T> {
24 pub fn new() -> Self {
25 Self {
26 handlers: Default::default(),
27 _pd: Default::default(),
28 }
29 }
30
31 pub fn publish(&self, dest: &str, data: T)
32 where
33 T: Clone + Send + Sync + 'static,
34 {
35 let mut guard = self.handlers.lock();
36
37 guard.retain(|(pattern, handler)| {
38 if pattern.matches(dest) {
39 let data = data.clone();
40 let dest = dest.to_owned();
41
42 handler.send((dest, data)).is_ok()
43 } else {
44 true
45 }
46 });
47 }
48
49 pub fn subscribe<L, E>(&self, pattern: Pattern, listener: L)
50 where
51 L: Sink<(String, T), Error = E> + Send + Sync + 'static,
52 T: Send + Sync + 'static,
53 {
54 let (tx, mut rx) = mpsc::unbounded_channel();
55 self.handlers.lock().push((pattern, tx));
56
57 tokio::spawn(async move {
58 let mut listener = Box::pin(listener);
59 while let Some((dest, data)) = rx.recv().await {
60 if listener.send((dest, data)).await.is_err() {
61 break;
62 }
63 }
64 });
65 }
66}