1#![deny(unsafe_code)]
2
3use std::ops::DerefMut;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use tokio::sync::RwLock;
9
10use rmqtt::{
11 context::ServerContext,
12 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
13 macros::Plugin,
14 plugin::{PackageInfo, Plugin},
15 register,
16 session::Session,
17 types::{Publish, Topic, TopicFilter, TopicName},
18 Result,
19};
20
21use config::{Action, DestTopicItem, PluginConfig, Rule};
22
23mod config;
24
25register!(TopicRewritePlugin::new);
26
27#[derive(Plugin)]
28struct TopicRewritePlugin {
29 scx: ServerContext,
30 register: Box<dyn Register>,
31 cfg: Arc<RwLock<PluginConfig>>,
32}
33
34impl TopicRewritePlugin {
35 #[inline]
36 async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
37 let name = name.into();
38 let cfg = scx.plugins.read_config::<PluginConfig>(&name)?;
39 let cfg = Arc::new(RwLock::new(cfg));
40 log::info!("{} TopicRewritePlugin cfg: {:?}", name, cfg.read().await);
41 let register = scx.extends.hook_mgr().register();
42 Ok(Self { scx, register, cfg })
43 }
44}
45
46#[async_trait]
47impl Plugin for TopicRewritePlugin {
48 #[inline]
49 async fn init(&mut self) -> Result<()> {
50 log::info!("{} init", self.name());
51 let cfg = &self.cfg;
52 let priority = cfg.read().await.priority;
53 self.register
54 .add_priority(Type::MessagePublish, priority, Box::new(TopicRewriteHandler::new(cfg)))
55 .await;
56 self.register
57 .add_priority(Type::ClientSubscribe, priority, Box::new(TopicRewriteHandler::new(cfg)))
58 .await;
59 self.register
60 .add_priority(Type::ClientUnsubscribe, priority, Box::new(TopicRewriteHandler::new(cfg)))
61 .await;
62 Ok(())
63 }
64
65 #[inline]
66 async fn get_config(&self) -> Result<serde_json::Value> {
67 self.cfg.read().await.to_json()
68 }
69
70 #[inline]
71 async fn load_config(&mut self) -> Result<()> {
72 let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
73 *self.cfg.write().await = new_cfg;
74 log::debug!("load_config ok, {:?}", self.cfg);
75 Ok(())
76 }
77
78 #[inline]
79 async fn start(&mut self) -> Result<()> {
80 log::info!("{} start", self.name());
81 self.register.start().await;
82 Ok(())
83 }
84
85 #[inline]
86 async fn stop(&mut self) -> Result<bool> {
87 log::info!("{} stop", self.name());
88 self.register.stop().await;
89 Ok(false)
90 }
91}
92
93struct TopicRewriteHandler {
94 cfg: Arc<RwLock<PluginConfig>>,
95}
96
97impl TopicRewriteHandler {
98 fn new(cfg: &Arc<RwLock<PluginConfig>>) -> Self {
99 Self { cfg: cfg.clone() }
100 }
101
102 #[inline]
103 pub async fn rewrite_publish_topic(&self, s: Option<&Session>, p: &Publish) -> Result<Option<Publish>> {
104 match self.rewrite_topic(Action::Publish, s, &p.topic).await? {
105 Some(topic) => {
106 log::debug!("new_topic: {topic}");
107 let mut new_p = p.clone();
108 new_p.deref_mut().topic = topic;
109 Ok(Some(new_p))
110 }
111 None => Ok(None),
112 }
113 }
114
115 #[inline]
116 pub async fn rewrite_subscribe_topic(
117 &self,
118 s: Option<&Session>,
119 topic_filter: &str,
120 ) -> Result<Option<TopicFilter>> {
121 match self.rewrite_topic(Action::Subscribe, s, topic_filter).await? {
122 Some(new_tf) => {
123 log::debug!("new_tf: {new_tf}");
124 Ok(Some(new_tf))
125 }
126 None => Ok(None),
127 }
128 }
129
130 #[inline]
131 pub async fn rewrite_topic(
132 &self,
133 action: Action,
134 s: Option<&Session>,
135 topic: &str,
136 ) -> Result<Option<TopicName>> {
137 let t = Topic::from_str(topic)?;
138 if let Some(r) = self
140 .cfg
141 .read()
142 .await
143 .rules()
144 .read()
145 .await
146 .matches(&t)
147 .iter()
148 .flat_map(|(_, rs)| rs)
149 .filter(|r| match (r.action, action) {
150 (Action::All, _) => true,
151 (Action::Subscribe, Action::Subscribe) => true,
152 (Action::Publish, Action::Publish) => true,
153 (_, _) => false,
154 })
155 .collect::<Vec<_>>()
156 .last()
157 {
158 log::debug!("rule: {r:?}, input topic: {topic}");
159 Ok(self.make_new_topic(r, s, topic))
160 } else {
161 Ok(None)
162 }
163 }
164
165 #[inline]
166 fn make_new_topic(&self, r: &Rule, s: Option<&Session>, topic: &str) -> Option<TopicName> {
167 let mut matcheds = Vec::new();
168 if let Some(re) = r.re.get() {
169 if let Some(caps) = re.captures(topic) {
170 for i in 1..caps.len() {
171 if let Some(matched) = caps.get(i) {
172 matcheds.push((i, matched.as_str()));
173 }
174 }
175 }
176 }
177 log::debug!("matcheds: {matcheds:?}");
178 let mut new_topic = String::new();
179 for item in &r.dest_topic_items {
180 match item {
181 DestTopicItem::Normal(normal) => new_topic.push_str(normal),
182 DestTopicItem::Clientid => {
183 if let Some(s) = s {
184 new_topic.push_str(s.id.client_id.as_ref())
185 } else {
186 log::info!(
187 "session is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
188 r.source_topic_filter,
189 r.dest_topic,
190 topic
191 );
192 return None;
193 }
194 }
195 DestTopicItem::Username => {
196 if let Some(s) = s {
197 if let Some(name) = s.username() {
198 new_topic.push_str(name)
199 } else {
200 log::warn!(
201 "{} username is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
202 s.id,
203 r.source_topic_filter,
204 r.dest_topic,
205 topic
206 );
207 return None;
208 }
209 } else {
210 log::info!(
211 "session is not exist, source_topic_filter: {}, dest_topic: {}, input topic: {}",
212 r.source_topic_filter,
213 r.dest_topic,
214 topic
215 );
216 return None;
217 }
218 }
219 DestTopicItem::Place(idx) => {
220 if let Some((_, matched)) = matcheds.iter().find(|(p, _)| *p == *idx) {
221 new_topic.push_str(matched)
222 } else {
223 log::warn!(
224 "{:?} placeholders(${}) is not exist, source_topic_filter: {}, dest_topic: {}, regex: {:?}, input topic: {}, matcheds: {:?}",
225 s.map(|s| &s.id),
226 idx,
227 r.source_topic_filter,
228 r.dest_topic,
229 r.re.get(),
230 topic,
231 matcheds
232 );
233 return None;
234 }
235 }
236 }
237 }
238 Some(TopicName::from(new_topic))
239 }
240}
241
242#[async_trait]
243impl Handler for TopicRewriteHandler {
244 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
245 match param {
246 Parameter::MessagePublish(s, _f, p) => {
247 let p = if let Some(HookResult::Publish(publish)) = &acc { publish } else { p };
248 log::debug!("{:?} topic-rewrite MessagePublish ..", s.map(|s| &s.id));
249 match self.rewrite_publish_topic(s.as_ref().map(|s| *s), p).await {
250 Err(e) => {
251 log::error!("{:?} topic format error, {:?}", s.map(|s| &s.id), e);
252 return (true, acc);
253 }
254 Ok(Some(p)) => {
255 return (true, Some(HookResult::Publish(p)));
256 }
257 Ok(None) => {}
258 }
259 }
260 Parameter::ClientSubscribe(s, sub) => {
261 let topic_filter = if let Some(HookResult::TopicFilter(Some(topic_filter))) = &acc {
262 topic_filter
263 } else {
264 &sub.topic_filter
265 };
266 match self.rewrite_subscribe_topic(Some(*s), topic_filter).await {
267 Err(e) => {
268 log::error!("{} topic_filter format error, {:?}", s.id, e);
269 return (true, acc);
270 }
271 Ok(Some(tf)) => {
272 return (true, Some(HookResult::TopicFilter(Some(tf))));
273 }
274 Ok(None) => {}
275 }
276 }
277 Parameter::ClientUnsubscribe(s, unsub) => {
278 let topic_filter = if let Some(HookResult::TopicFilter(Some(topic_filter))) = &acc {
279 topic_filter
280 } else {
281 &unsub.topic_filter
282 };
283 match self.rewrite_subscribe_topic(Some(*s), topic_filter).await {
284 Err(e) => {
285 log::error!("{} topic_filter format error, {:?}", s.id, e);
286 return (true, acc);
287 }
288 Ok(Some(tf)) => {
289 return (true, Some(HookResult::TopicFilter(Some(tf))));
290 }
291 Ok(None) => {}
292 }
293 }
294 _ => {
295 log::error!("parameter is: {param:?}");
296 }
297 }
298 (true, acc)
299 }
300}