rmqtt_topic_rewrite/
lib.rs

1#![deny(unsafe_code)]
2
3use std::ops::DerefMut;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use tokio::sync::RwLock;
9
10use rmqtt::{
11    context::ServerContext,
12    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
13    macros::Plugin,
14    plugin::{PackageInfo, Plugin},
15    register,
16    session::Session,
17    types::{Publish, Topic, TopicFilter, TopicName},
18    Result,
19};
20
21use config::{Action, DestTopicItem, PluginConfig, Rule};
22
23mod config;
24
25register!(TopicRewritePlugin::new);
26
27#[derive(Plugin)]
28struct TopicRewritePlugin {
29    scx: ServerContext,
30    register: Box<dyn Register>,
31    cfg: Arc<RwLock<PluginConfig>>,
32}
33
34impl TopicRewritePlugin {
35    #[inline]
36    async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
37        let name = name.into();
38        let cfg = scx.plugins.read_config::<PluginConfig>(&name)?;
39        let cfg = Arc::new(RwLock::new(cfg));
40        log::info!("{} TopicRewritePlugin cfg: {:?}", name, cfg.read().await);
41        let register = scx.extends.hook_mgr().register();
42        Ok(Self { scx, register, cfg })
43    }
44}
45
46#[async_trait]
47impl Plugin for TopicRewritePlugin {
48    #[inline]
49    async fn init(&mut self) -> Result<()> {
50        log::info!("{} init", self.name());
51        let cfg = &self.cfg;
52        let priority = cfg.read().await.priority;
53        self.register
54            .add_priority(Type::MessagePublish, priority, Box::new(TopicRewriteHandler::new(cfg)))
55            .await;
56        self.register
57            .add_priority(Type::ClientSubscribe, priority, Box::new(TopicRewriteHandler::new(cfg)))
58            .await;
59        self.register
60            .add_priority(Type::ClientUnsubscribe, priority, Box::new(TopicRewriteHandler::new(cfg)))
61            .await;
62        Ok(())
63    }
64
65    #[inline]
66    async fn get_config(&self) -> Result<serde_json::Value> {
67        self.cfg.read().await.to_json()
68    }
69
70    #[inline]
71    async fn load_config(&mut self) -> Result<()> {
72        let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
73        *self.cfg.write().await = new_cfg;
74        log::debug!("load_config ok,  {:?}", self.cfg);
75        Ok(())
76    }
77
78    #[inline]
79    async fn start(&mut self) -> Result<()> {
80        log::info!("{} start", self.name());
81        self.register.start().await;
82        Ok(())
83    }
84
85    #[inline]
86    async fn stop(&mut self) -> Result<bool> {
87        log::info!("{} stop", self.name());
88        self.register.stop().await;
89        Ok(false)
90    }
91}
92
93struct TopicRewriteHandler {
94    cfg: Arc<RwLock<PluginConfig>>,
95}
96
97impl TopicRewriteHandler {
98    fn new(cfg: &Arc<RwLock<PluginConfig>>) -> Self {
99        Self { cfg: cfg.clone() }
100    }
101
102    #[inline]
103    pub async fn rewrite_publish_topic(&self, s: Option<&Session>, p: &Publish) -> Result<Option<Publish>> {
104        match self.rewrite_topic(Action::Publish, s, &p.topic).await? {
105            Some(topic) => {
106                log::debug!("new_topic: {topic}");
107                let mut new_p = p.clone();
108                new_p.deref_mut().topic = topic;
109                Ok(Some(new_p))
110            }
111            None => Ok(None),
112        }
113    }
114
115    #[inline]
116    pub async fn rewrite_subscribe_topic(
117        &self,
118        s: Option<&Session>,
119        topic_filter: &str,
120    ) -> Result<Option<TopicFilter>> {
121        match self.rewrite_topic(Action::Subscribe, s, topic_filter).await? {
122            Some(new_tf) => {
123                log::debug!("new_tf: {new_tf}");
124                Ok(Some(new_tf))
125            }
126            None => Ok(None),
127        }
128    }
129
130    #[inline]
131    pub async fn rewrite_topic(
132        &self,
133        action: Action,
134        s: Option<&Session>,
135        topic: &str,
136    ) -> Result<Option<TopicName>> {
137        let t = Topic::from_str(topic)?;
138        //Take only the best match.
139        if let Some(r) = self
140            .cfg
141            .read()
142            .await
143            .rules()
144            .read()
145            .await
146            .matches(&t)
147            .iter()
148            .flat_map(|(_, rs)| rs)
149            .filter(|r| match (r.action, action) {
150                (Action::All, _) => true,
151                (Action::Subscribe, Action::Subscribe) => true,
152                (Action::Publish, Action::Publish) => true,
153                (_, _) => false,
154            })
155            .collect::<Vec<_>>()
156            .last()
157        {
158            log::debug!("rule: {r:?}, input topic: {topic}");
159            Ok(self.make_new_topic(r, s, topic))
160        } else {
161            Ok(None)
162        }
163    }
164
165    #[inline]
166    fn make_new_topic(&self, r: &Rule, s: Option<&Session>, topic: &str) -> Option<TopicName> {
167        let mut matcheds = Vec::new();
168        if let Some(re) = r.re.get() {
169            if let Some(caps) = re.captures(topic) {
170                for i in 1..caps.len() {
171                    if let Some(matched) = caps.get(i) {
172                        matcheds.push((i, matched.as_str()));
173                    }
174                }
175            }
176        }
177        log::debug!("matcheds: {matcheds:?}");
178        let mut new_topic = String::new();
179        for item in &r.dest_topic_items {
180            match item {
181                DestTopicItem::Normal(normal) => new_topic.push_str(normal),
182                DestTopicItem::Clientid => {
183                    if let Some(s) = s {
184                        new_topic.push_str(s.id.client_id.as_ref())
185                    } else {
186                        log::info!(
187                            "session is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
188                            r.source_topic_filter,
189                            r.dest_topic,
190                            topic
191                        );
192                        return None;
193                    }
194                }
195                DestTopicItem::Username => {
196                    if let Some(s) = s {
197                        if let Some(name) = s.username() {
198                            new_topic.push_str(name)
199                        } else {
200                            log::warn!(
201                                "{} username is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
202                                s.id,
203                                r.source_topic_filter,
204                                r.dest_topic,
205                                topic
206                            );
207                            return None;
208                        }
209                    } else {
210                        log::info!(
211                            "session is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
212                            r.source_topic_filter,
213                            r.dest_topic,
214                            topic
215                        );
216                        return None;
217                    }
218                }
219                DestTopicItem::Place(idx) => {
220                    if let Some((_, matched)) = matcheds.iter().find(|(p, _)| *p == *idx) {
221                        new_topic.push_str(matched)
222                    } else {
223                        log::warn!(
224                            "{:?} placeholders(${}) is not exist, source_topic_filter: {}, dest_topic: {}, regex: {:?}, input topic: {}, matcheds: {:?}",
225                            s.map(|s| &s.id),
226                            idx,
227                            r.source_topic_filter,
228                            r.dest_topic,
229                            r.re.get(),
230                            topic,
231                            matcheds
232                        );
233                        return None;
234                    }
235                }
236            }
237        }
238        Some(TopicName::from(new_topic))
239    }
240}
241
242#[async_trait]
243impl Handler for TopicRewriteHandler {
244    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
245        match param {
246            Parameter::MessagePublish(s, _f, p) => {
247                let p = if let Some(HookResult::Publish(publish)) = &acc { publish } else { p };
248                log::debug!("{:?} topic-rewrite MessagePublish ..", s.map(|s| &s.id));
249                match self.rewrite_publish_topic(s.as_ref().map(|s| *s), p).await {
250                    Err(e) => {
251                        log::error!("{:?} topic format error, {:?}", s.map(|s| &s.id), e);
252                        return (true, acc);
253                    }
254                    Ok(Some(p)) => {
255                        return (true, Some(HookResult::Publish(p)));
256                    }
257                    Ok(None) => {}
258                }
259            }
260            Parameter::ClientSubscribe(s, sub) => {
261                let topic_filter = if let Some(HookResult::TopicFilter(Some(topic_filter))) = &acc {
262                    topic_filter
263                } else {
264                    &sub.topic_filter
265                };
266                match self.rewrite_subscribe_topic(Some(*s), topic_filter).await {
267                    Err(e) => {
268                        log::error!("{} topic_filter format error, {:?}", s.id, e);
269                        return (true, acc);
270                    }
271                    Ok(Some(tf)) => {
272                        return (true, Some(HookResult::TopicFilter(Some(tf))));
273                    }
274                    Ok(None) => {}
275                }
276            }
277            Parameter::ClientUnsubscribe(s, unsub) => {
278                let topic_filter = if let Some(HookResult::TopicFilter(Some(topic_filter))) = &acc {
279                    topic_filter
280                } else {
281                    &unsub.topic_filter
282                };
283                match self.rewrite_subscribe_topic(Some(*s), topic_filter).await {
284                    Err(e) => {
285                        log::error!("{} topic_filter format error, {:?}", s.id, e);
286                        return (true, acc);
287                    }
288                    Ok(Some(tf)) => {
289                        return (true, Some(HookResult::TopicFilter(Some(tf))));
290                    }
291                    Ok(None) => {}
292                }
293            }
294            _ => {
295                log::error!("parameter is: {param:?}");
296            }
297        }
298        (true, acc)
299    }
300}