rmqtt_p2p_messaging/
lib.rs1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use anyhow::anyhow;
6use async_trait::async_trait;
7use tokio::sync::RwLock;
8
9use rmqtt::{
10 context::ServerContext,
11 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
12 macros::Plugin,
13 plugin::{PackageInfo, Plugin},
14 register,
15 types::TopicName,
16 Result,
17};
18
19use crate::config::Mode;
20use config::PluginConfig;
21use rmqtt::types::ClientId;
22
23mod config;
24
25register!(P2PMessagingPlugin::new);
26
27#[derive(Plugin)]
28struct P2PMessagingPlugin {
29 scx: ServerContext,
30 register: Box<dyn Register>,
31 cfg: Arc<RwLock<PluginConfig>>,
32}
33
34impl P2PMessagingPlugin {
35 #[inline]
36 async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
37 let name = name.into();
38 let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
39 let cfg = Arc::new(RwLock::new(cfg));
40 log::info!("{} P2PMessagingPlugin cfg: {:?}", name, cfg.read().await);
41 let register = scx.extends.hook_mgr().register();
42 Ok(Self { scx, register, cfg })
43 }
44}
45
46#[async_trait]
47impl Plugin for P2PMessagingPlugin {
48 #[inline]
49 async fn init(&mut self) -> Result<()> {
50 log::info!("{} init", self.name());
51 let cfg = &self.cfg;
52 self.register.add(Type::MessagePublish, Box::new(P2PMessagingHandler::new(cfg))).await;
53 Ok(())
54 }
55
56 #[inline]
57 async fn get_config(&self) -> Result<serde_json::Value> {
58 self.cfg.read().await.to_json()
59 }
60
61 #[inline]
62 async fn load_config(&mut self) -> Result<()> {
63 let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
64 *self.cfg.write().await = new_cfg;
65 log::debug!("load_config ok, {:?}", self.cfg);
66 Ok(())
67 }
68
69 #[inline]
70 async fn start(&mut self) -> Result<()> {
71 log::info!("{} start", self.name());
72 self.register.start().await;
73 Ok(())
74 }
75
76 #[inline]
77 async fn stop(&mut self) -> Result<bool> {
78 log::info!("{} stop", self.name());
79 self.register.stop().await;
80 Ok(false)
81 }
82}
83
84#[derive(Debug)]
85struct P2PMatch {
86 clientid: String,
87 topic: String,
88}
89
90struct P2PMessagingHandler {
91 cfg: Arc<RwLock<PluginConfig>>,
92}
93
94impl P2PMessagingHandler {
95 fn new(cfg: &Arc<RwLock<PluginConfig>>) -> Self {
96 Self { cfg: cfg.clone() }
97 }
98
99 #[inline]
100 async fn parse_p2p_topic(&self, topic: &str) -> Result<Option<P2PMatch>> {
101 let cfg = self.cfg.read().await;
102 let mode = cfg.mode;
103 let parts: Vec<&str> = topic.split('/').collect();
104
105 let p2p_match = match mode {
106 Mode::Prefix | Mode::Both if parts.len() >= 3 && parts[0] == "p2p" => {
107 Some(P2PMatch { clientid: parts[1].to_string(), topic: parts[2..].join("/") })
108 }
109 Mode::Suffix | Mode::Both if parts.len() >= 3 && parts[parts.len() - 2] == "p2p" => {
110 let clientid = parts
111 .last()
112 .ok_or_else(|| anyhow!("Invalid topic format, clientid not found: {topic:?}"))?;
113 Some(P2PMatch { clientid: clientid.to_string(), topic: parts[..parts.len() - 2].join("/") })
114 }
115 _ => None,
116 };
117
118 if let Some(ref m) = p2p_match {
119 if m.clientid.is_empty() {
120 return Err(anyhow!("Invalid topic format, clientid not found: {topic:?}"));
121 }
122 if m.topic.is_empty() {
123 return Err(anyhow!("Invalid topic format, sub topic not found: {topic:?}"));
124 }
125 }
126
127 Ok(p2p_match)
128 }
129}
130
131#[async_trait]
132impl Handler for P2PMessagingHandler {
133 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
134 match param {
135 Parameter::MessagePublish(s, _f, p) => {
136 log::debug!("{:?} MessagePublish ..", s.map(|s| &s.id));
137
138 match self.parse_p2p_topic(&p.topic).await {
139 Ok(Some(p2p_match)) => {
140 log::debug!("{p2p_match:?}");
141 let mut p1 = (*p).clone();
142 p1.topic = TopicName::from(p2p_match.topic);
143 p1.target_clientid = Some(ClientId::from(p2p_match.clientid));
144 return (true, Some(HookResult::Publish(p1)));
145 }
146 Ok(None) => {}
147 Err(e) => {
148 log::warn!("{e:?}");
149 }
150 }
151 }
152 _ => {
153 log::error!("parameter is: {param:?}");
154 }
155 }
156 (true, acc)
157 }
158}