krpc/
publish.rs

1#[derive(Debug)]
2struct Pair {
3	msg: Msg,
4	tx: mpsc::Sender<Bytes>,
5}
6
7#[derive(Debug)]
8enum Content {
9	Sub(u32, Pair),
10	Pub(&'static str, Vec<u8>),
11	Del(u32),
12}
13
14struct PubSub {
15	subs: HashMap<String, HashMap<u32, Option<Pair>>>,
16}
17
18impl PubSub {
19	// 添加订阅者
20	async fn add(&mut self, id: u32, pair: Pair, on_subcribe: &mut (impl FnMut(&str) -> Option<Bytes> + Send + 'static)) {
21		let name = String::from_utf8(pair.msg.name().to_vec()).unwrap();
22		if let Some(pairs) = self.subs.get_mut(&name) {
23			if pairs.get(&id).is_none() {
24				if let Some(data) = on_subcribe(&name) {
25					let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
26					pairs.insert(id, None);
27					return;
28				}
29			}
30			pairs.insert(id, Some(pair));
31		} else {
32			if let Some(data) = on_subcribe(&name) {
33				let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
34				self.subs.insert(name, HashMap::from([(id, None)]));
35			} else {
36				self.subs.insert(name, HashMap::from([(id, Some(pair))]));
37			}
38		}
39	}
40	fn del(&mut self, id: u32) {
41		for (_, pairs) in &mut self.subs {
42			let _ = pairs.remove(&id);
43		}
44	}
45	/// 若是发布内容过快可能会丢失,因为每次发布之后需要接收到订阅方的订阅信息才能再次发布
46	async fn publish(&mut self, name: &'static str, data: Vec<u8>) {
47		if let Some(pairs) = self.subs.get_mut(name) {
48			for (_, pair) in pairs {
49				if let Some(pair) = pair.take() {
50					let _ = pair.tx.send(pair.msg.encode_with_bytes(Mode::Publish, &data)).await;
51				}
52			}
53		}
54	}
55}
56
57#[derive(Clone)]
58pub struct Publisher {
59	tx: mpsc::Sender<Content>,
60}
61
62impl Publisher {
63	#[allow(unused_must_use)]
64	pub fn push<Args>(&self, topic: &'static str, args: Args)
65	where
66		Args: serde::ser::Serialize,
67	{
68		let data = rmps::encode::to_vec(&args).unwrap();
69		let tx = self.tx.clone();
70		tokio::spawn(async move {
71			tx.send(Content::Pub(topic, data)).await;
72		});
73	}
74}