1use super::{Dispatch, Senders, common::*};
2
3pub trait Message {
4 fn id(&self) -> u32;
5}
6
7pub struct EventsManager<T: Send + std::fmt::Debug + Clone + Message + 'static> {
8 sx: Sender<T>,
9 rx: Receiver<T>,
10 sync_sx: cb_channel::Sender<T>,
11 sync_rx: cb_channel::Receiver<T>,
12 exit_message_id: u32,
13}
14
15impl<T: Send + std::fmt::Debug + Clone + Message + 'static> EventsManager<T> {
16 pub async fn prepare(dispatch: &Dispatch<T>, exit_message_id: u32) -> Self {
17 let (sx, rx) = channel(32);
18 let (sync_sx, sync_rx) = cb_channel::unbounded();
19
20 dispatch.register(sx.clone(), sync_sx.clone()).await;
21
22 Self {
23 sx,
24 rx,
25 sync_rx,
26 sync_sx,
27 exit_message_id,
28 }
29 }
30}
31
32pub async fn start<T: Send + std::fmt::Debug + Clone + Message + 'static>(
33 manager: EventsManager<T>,
34 senders: Senders<T>,
35) {
36 let sync_rx = manager.sync_rx;
37 let sync_sx = manager.sync_sx;
38 let mut rx = manager.rx;
39 let sx = manager.sx.clone();
40 let exit_message_id = manager.exit_message_id;
41
42 tokio::task::spawn_blocking(move || {
43 while let Ok(m) = sync_rx.recv() {
44 if m.id() == exit_message_id {
45 break;
46 }
47 if let Err(e) = sx.send(m.clone()) {
48 log::error!("{:#?}", e);
49 }
50 }
51 });
52
53 let exit_message_id = manager.exit_message_id;
54
55 while let Ok(msg) = rx.recv().await {
56 if msg.id() == exit_message_id {
57 if let Err(e) = sync_sx.send(msg.clone()) {
58 log::error!("{:#?}", e);
59 }
60 let mut senders_inner = Vec::new();
61 for (_, sender) in senders.0.read().await.iter() {
62 senders_inner.push(sender.clone());
63 }
64 for s in senders_inner {
65 let m = msg.clone();
66 let _ = s.send(m);
67 }
68 break;
69 }
70 let s = match senders.0.read().await.get(&msg.id()) {
71 Some(s) => s.clone(),
72 None => {
73 continue;
74 }
75 };
76
77 if let Err(e) = s.send(msg) {
78 log::error!("{:#?}", e);
79 }
80 }
81}