rmqtt_p2p_messaging/
lib.rs

1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use anyhow::anyhow;
6use async_trait::async_trait;
7use tokio::sync::RwLock;
8
9use rmqtt::{
10    context::ServerContext,
11    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
12    macros::Plugin,
13    plugin::{PackageInfo, Plugin},
14    register,
15    types::TopicName,
16    Result,
17};
18
19use crate::config::Mode;
20use config::PluginConfig;
21use rmqtt::types::ClientId;
22
23mod config;
24
25register!(P2PMessagingPlugin::new);
26
27#[derive(Plugin)]
28struct P2PMessagingPlugin {
29    scx: ServerContext,
30    register: Box<dyn Register>,
31    cfg: Arc<RwLock<PluginConfig>>,
32}
33
34impl P2PMessagingPlugin {
35    #[inline]
36    async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
37        let name = name.into();
38        let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
39        let cfg = Arc::new(RwLock::new(cfg));
40        log::info!("{} P2PMessagingPlugin cfg: {:?}", name, cfg.read().await);
41        let register = scx.extends.hook_mgr().register();
42        Ok(Self { scx, register, cfg })
43    }
44}
45
46#[async_trait]
47impl Plugin for P2PMessagingPlugin {
48    #[inline]
49    async fn init(&mut self) -> Result<()> {
50        log::info!("{} init", self.name());
51        let cfg = &self.cfg;
52        self.register.add(Type::MessagePublish, Box::new(P2PMessagingHandler::new(cfg))).await;
53        Ok(())
54    }
55
56    #[inline]
57    async fn get_config(&self) -> Result<serde_json::Value> {
58        self.cfg.read().await.to_json()
59    }
60
61    #[inline]
62    async fn load_config(&mut self) -> Result<()> {
63        let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
64        *self.cfg.write().await = new_cfg;
65        log::debug!("load_config ok,  {:?}", self.cfg);
66        Ok(())
67    }
68
69    #[inline]
70    async fn start(&mut self) -> Result<()> {
71        log::info!("{} start", self.name());
72        self.register.start().await;
73        Ok(())
74    }
75
76    #[inline]
77    async fn stop(&mut self) -> Result<bool> {
78        log::info!("{} stop", self.name());
79        self.register.stop().await;
80        Ok(false)
81    }
82}
83
84#[derive(Debug)]
85struct P2PMatch {
86    clientid: String,
87    topic: String,
88}
89
90struct P2PMessagingHandler {
91    cfg: Arc<RwLock<PluginConfig>>,
92}
93
94impl P2PMessagingHandler {
95    fn new(cfg: &Arc<RwLock<PluginConfig>>) -> Self {
96        Self { cfg: cfg.clone() }
97    }
98
99    #[inline]
100    async fn parse_p2p_topic(&self, topic: &str) -> Result<Option<P2PMatch>> {
101        let cfg = self.cfg.read().await;
102        let mode = cfg.mode;
103        let parts: Vec<&str> = topic.split('/').collect();
104
105        let p2p_match = match mode {
106            Mode::Prefix | Mode::Both if parts.len() >= 3 && parts[0] == "p2p" => {
107                Some(P2PMatch { clientid: parts[1].to_string(), topic: parts[2..].join("/") })
108            }
109            Mode::Suffix | Mode::Both if parts.len() >= 3 && parts[parts.len() - 2] == "p2p" => {
110                let clientid = parts
111                    .last()
112                    .ok_or_else(|| anyhow!("Invalid topic format, clientid not found: {topic:?}"))?;
113                Some(P2PMatch { clientid: clientid.to_string(), topic: parts[..parts.len() - 2].join("/") })
114            }
115            _ => None,
116        };
117
118        if let Some(ref m) = p2p_match {
119            if m.clientid.is_empty() {
120                return Err(anyhow!("Invalid topic format, clientid not found: {topic:?}"));
121            }
122            if m.topic.is_empty() {
123                return Err(anyhow!("Invalid topic format, sub topic not found: {topic:?}"));
124            }
125        }
126
127        Ok(p2p_match)
128    }
129}
130
131#[async_trait]
132impl Handler for P2PMessagingHandler {
133    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
134        match param {
135            Parameter::MessagePublish(s, _f, p) => {
136                log::debug!("{:?} MessagePublish ..", s.map(|s| &s.id));
137
138                match self.parse_p2p_topic(&p.topic).await {
139                    Ok(Some(p2p_match)) => {
140                        log::debug!("{p2p_match:?}");
141                        let mut p1 = (*p).clone();
142                        p1.topic = TopicName::from(p2p_match.topic);
143                        p1.target_clientid = Some(ClientId::from(p2p_match.clientid));
144                        return (true, Some(HookResult::Publish(p1)));
145                    }
146                    Ok(None) => {}
147                    Err(e) => {
148                        log::warn!("{e:?}");
149                    }
150                }
151            }
152            _ => {
153                log::error!("parameter is: {param:?}");
154            }
155        }
156        (true, acc)
157    }
158}