1#[derive(Debug)]
2struct Pair {
3 msg: Msg,
4 tx: mpsc::Sender<Bytes>,
5}
6
7#[derive(Debug)]
8enum Content {
9 Sub(u32, Pair),
10 Pub(&'static str, Vec<u8>),
11 Del(u32),
12}
13
14struct PubSub {
15 subs: HashMap<String, HashMap<u32, Option<Pair>>>,
16}
17
18impl PubSub {
19 async fn add(&mut self, id: u32, pair: Pair, on_subcribe: &mut (impl FnMut(&str) -> Option<Bytes> + Send + 'static)) {
21 let name = String::from_utf8(pair.msg.name().to_vec()).unwrap();
22 if let Some(pairs) = self.subs.get_mut(&name) {
23 if pairs.get(&id).is_none() {
24 if let Some(data) = on_subcribe(&name) {
25 let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
26 pairs.insert(id, None);
27 return;
28 }
29 }
30 pairs.insert(id, Some(pair));
31 } else {
32 if let Some(data) = on_subcribe(&name) {
33 let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
34 self.subs.insert(name, HashMap::from([(id, None)]));
35 } else {
36 self.subs.insert(name, HashMap::from([(id, Some(pair))]));
37 }
38 }
39 }
40 fn del(&mut self, id: u32) {
41 for (_, pairs) in &mut self.subs {
42 let _ = pairs.remove(&id);
43 }
44 }
45 async fn publish(&mut self, name: &'static str, data: Vec<u8>) {
47 if let Some(pairs) = self.subs.get_mut(name) {
48 for (_, pair) in pairs {
49 if let Some(pair) = pair.take() {
50 let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
51 }
52 }
53 }
54 }
55}
56
57#[derive(Clone)]
58pub struct Publisher {
59 tx: mpsc::Sender<Content>,
60}
61
62impl Publisher {
63 #[allow(unused_must_use)]
64 pub fn push<Args>(&self, topic: &'static str, args: Args)
65 where
66 Args: serde::ser::Serialize,
67 {
68 let data = rmps::encode::to_vec(&args).unwrap();
69 let tx = self.tx.clone();
70 tokio::spawn(async move {
71 tx.send(Content::Pub(topic, data)).await;
72 });
73 }
74}