ev_apple/
manager.rs

1use super::{Dispatch, Senders, common::*};
2
3pub trait Message {
4    fn id(&self) -> u32;
5}
6
7pub struct EventsManager<T: Send + std::fmt::Debug + Clone + Message + 'static> {
8    sx: Sender<T>,
9    rx: Receiver<T>,
10    sync_sx: cb_channel::Sender<T>,
11    sync_rx: cb_channel::Receiver<T>,
12    exit_message_id: u32,
13}
14
15impl<T: Send + std::fmt::Debug + Clone + Message + 'static> EventsManager<T> {
16    pub async fn prepare(dispatch: &Dispatch<T>, exit_message_id: u32) -> Self {
17        let (sx, rx) = channel(32);
18        let (sync_sx, sync_rx) = cb_channel::unbounded();
19
20        dispatch.register(sx.clone(), sync_sx.clone()).await;
21
22        Self {
23            sx,
24            rx,
25            sync_rx,
26            sync_sx,
27            exit_message_id,
28        }
29    }
30}
31
32pub async fn start<T: Send + std::fmt::Debug + Clone + Message + 'static>(
33    manager: EventsManager<T>,
34    senders: Senders<T>,
35) {
36    let sync_rx = manager.sync_rx;
37    let sync_sx = manager.sync_sx;
38    let mut rx = manager.rx;
39    let sx = manager.sx.clone();
40    let exit_message_id = manager.exit_message_id;
41
42    tokio::task::spawn_blocking(move || {
43        while let Ok(m) = sync_rx.recv() {
44            if m.id() ==  exit_message_id {
45                break;
46            }
47            if let Err(e) = sx.send(m.clone()) {
48                log::error!("{:#?}", e);
49            }
50        }
51    });
52
53    let exit_message_id = manager.exit_message_id;
54
55    while let Ok(msg) = rx.recv().await {
56        if msg.id() == exit_message_id {
57            if let Err(e) = sync_sx.send(msg.clone()) {
58                log::error!("{:#?}", e);
59            }
60            let mut senders_inner = Vec::new();
61            for (_, sender) in senders.0.read().await.iter() {
62                senders_inner.push(sender.clone());
63            }
64            for s in senders_inner {
65                let m = msg.clone();
66                let _ = s.send(m);
67            }
68            break;
69        }
70        let s = match senders.0.read().await.get(&msg.id()) {
71            Some(s) => s.clone(),
72            None => {
73                continue;
74            }
75        };
76
77        if let Err(e) = s.send(msg) {
78            log::error!("{:#?}", e);
79        }
80    }
81}