next_web_dev/event/
default_application_event_multicaster.rs1use super::key::Key;
2use super::{
3 application_event::ApplicationEvent,
4 application_event_multicaster::ApplicationEventMulticaster,
5 application_listener::ApplicationListener,
6};
7
8use flume::{Receiver, Sender};
9use hashbrown::HashMap;
10use parking_lot::Mutex;
11use std::any::TypeId;
12use std::borrow::Cow;
13use std::sync::Arc;
14use tracing::{debug, info};
15
16#[derive(Clone)]
20pub struct DefaultApplicationEventMulticaster {
21 listeners: Arc<Mutex<HashMap<Key, Sender<Box<dyn ApplicationEvent>>>>>,
24 listeners_type: Arc<Mutex<HashMap<TypeId, Sender<Box<dyn ApplicationEvent>>>>>,
25
26 event_channel: Option<Receiver<(Cow<'static, str>, Box<dyn ApplicationEvent>)>>,
29}
30
31impl DefaultApplicationEventMulticaster {
32 pub fn new() -> Self {
35 DefaultApplicationEventMulticaster {
36 listeners: Arc::new(Mutex::new(HashMap::new())),
37 listeners_type: Arc::new(Mutex::new(HashMap::new())),
38
39 event_channel: None,
40 }
41 }
42
43 pub fn set_event_channel(
46 &mut self,
47 channel: Receiver<(Cow<'static, str>, Box<dyn ApplicationEvent>)>,
48 ) {
49 self.event_channel = Some(channel);
50 }
51
52 pub fn run(&self) {
55 let channel = self.event_channel.clone().unwrap();
56 let listeners = self.listeners.clone();
57 let listeners_type = self.listeners_type.clone();
58 tokio::spawn(async move {
59 while let Ok(event) = channel.recv() {
60 if event.0.is_empty() {
61 if let Some(listeners) = listeners_type.lock().get(&event.1.tid()) {
62 let _ = listeners.send(event.1);
63 }
64 } else {
65 if let Some(listeners) = listeners.lock().get(&Key::new(event.0, event.1.tid()))
66 {
67 let _ = listeners.send(event.1);
68 }
69 }
70 }
71 });
72 }
73}
74
75impl ApplicationEventMulticaster for DefaultApplicationEventMulticaster {
76 fn add_application_listener(&mut self, mut listener: Box<dyn ApplicationListener>) {
77 let id = listener.id().clone();
78 let tid = listener.tid().clone();
79
80 let (sender, receiver) = flume::unbounded();
81 if id.is_empty() {
82 if let None = self.listeners_type.lock().get(&tid) {
83 self.listeners_type.lock().insert(tid, sender);
84 };
85 } else {
86 let key = Key::new(listener.id(), listener.tid());
87 if let None = self.listeners_type.lock().get(&tid) {
88 self.listeners.lock().insert(key, sender);
89 };
90 }
91 tokio::spawn(async move {
92 while let Ok(event) = receiver.recv() {
93 listener.on_application_event(&event).await;
94 info!("Received event: {:?}", event.source());
95 }
96 });
97 debug!("Added listener for event type: {:?}, id: {}", tid, id);
98 }
99
100 fn remove_application_listener(&mut self, key: &Key) {
101 if let Some(_) = self.listeners.lock().remove(key) {
104 debug!("Removed listener for event type: {}", key);
105 };
106 }
107
108 }