rmqtt_auto_subscription/
lib.rs1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::sync::RwLock;
7
8use rmqtt::{
9 context::ServerContext,
10 macros::Plugin,
11 plugin::{PackageInfo, Plugin},
12 register,
13 subscribe::{AutoSubscription, DefaultAutoSubscription},
14 types::{Id, Subscribe, TopicFilter},
15 Result,
16};
17
18use config::PluginConfig;
19
20mod config;
21
22register!(AutoSubscriptionPlugin::new);
23
24#[derive(Plugin)]
25struct AutoSubscriptionPlugin {
26 scx: ServerContext,
27 cfg: Arc<RwLock<PluginConfig>>,
28}
29
30impl AutoSubscriptionPlugin {
31 #[inline]
32 async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
33 let name = name.into();
34 let cfg = scx.plugins.read_config::<PluginConfig>(&name)?;
35 let cfg = Arc::new(RwLock::new(cfg));
36 log::info!("{} AutoSubscriptionPlugin cfg: {:?}", name, cfg.read().await);
37 Ok(Self { scx, cfg })
38 }
39}
40
41#[async_trait]
42impl Plugin for AutoSubscriptionPlugin {
43 #[inline]
44 async fn init(&mut self) -> Result<()> {
45 log::info!("{} init", self.name());
46 Ok(())
47 }
48
49 #[inline]
50 async fn get_config(&self) -> Result<serde_json::Value> {
51 self.cfg.read().await.to_json()
52 }
53
54 #[inline]
55 async fn load_config(&mut self) -> Result<()> {
56 let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
57 *self.cfg.write().await = new_cfg;
58 log::debug!("load_config ok, {:?}", self.cfg);
59 Ok(())
60 }
61
62 #[inline]
63 async fn start(&mut self) -> Result<()> {
64 log::info!("{} start", self.name());
65 *self.scx.extends.auto_subscription_mut().await = Box::new(XAutoSubscription::new(self.cfg.clone()));
66 Ok(())
67 }
68
69 #[inline]
70 async fn stop(&mut self) -> Result<bool> {
71 log::info!("{} stop", self.name());
72 *self.scx.extends.auto_subscription_mut().await = Box::new(DefaultAutoSubscription);
73 Ok(false)
74 }
75}
76
77pub struct XAutoSubscription {
78 cfg: Arc<RwLock<PluginConfig>>,
79}
80
81impl XAutoSubscription {
82 #[inline]
83 pub(crate) fn new(cfg: Arc<RwLock<PluginConfig>>) -> XAutoSubscription {
84 Self { cfg }
85 }
86}
87
88#[async_trait]
89impl AutoSubscription for XAutoSubscription {
90 #[inline]
91 fn enable(&self) -> bool {
92 true
93 }
94
95 #[inline]
96 async fn subscribes(&self, id: &Id) -> Result<Vec<Subscribe>> {
97 let mut subs = Vec::new();
98 for item in self.cfg.read().await.subscribes.iter() {
99 let mut sub = item.sub.clone();
100 if item.has_clientid_placeholder {
101 sub.topic_filter = TopicFilter::from(sub.topic_filter.replace("${clientid}", &id.client_id));
102 }
103 if item.has_username_placeholder {
104 if let Some(username) = &id.username {
105 sub.topic_filter = TopicFilter::from(sub.topic_filter.replace("${username}", username));
106 } else {
107 log::warn!("{id} auto subscribe failed, username is not exist");
108 continue;
109 }
110 }
111 subs.push(sub);
112 }
113 Ok(subs)
114 }
115}