Skip to main content

rmqtt_auto_subscription/
lib.rs

1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::sync::RwLock;
7
8use rmqtt::{
9    context::ServerContext,
10    macros::Plugin,
11    plugin::{PackageInfo, Plugin},
12    register,
13    subscribe::{AutoSubscription, DefaultAutoSubscription},
14    types::{Id, Subscribe, TopicFilter},
15    Result,
16};
17
18use config::PluginConfig;
19
20mod config;
21
22register!(AutoSubscriptionPlugin::new);
23
24#[derive(Plugin)]
25struct AutoSubscriptionPlugin {
26    scx: ServerContext,
27    cfg: Arc<RwLock<PluginConfig>>,
28}
29
30impl AutoSubscriptionPlugin {
31    #[inline]
32    async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
33        let name = name.into();
34        let cfg = scx.plugins.read_config::<PluginConfig>(&name)?;
35        let cfg = Arc::new(RwLock::new(cfg));
36        log::info!("{} AutoSubscriptionPlugin cfg: {:?}", name, cfg.read().await);
37        Ok(Self { scx, cfg })
38    }
39}
40
41#[async_trait]
42impl Plugin for AutoSubscriptionPlugin {
43    #[inline]
44    async fn init(&mut self) -> Result<()> {
45        log::info!("{} init", self.name());
46        Ok(())
47    }
48
49    #[inline]
50    async fn get_config(&self) -> Result<serde_json::Value> {
51        self.cfg.read().await.to_json()
52    }
53
54    #[inline]
55    async fn load_config(&mut self) -> Result<()> {
56        let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
57        *self.cfg.write().await = new_cfg;
58        log::debug!("load_config ok,  {:?}", self.cfg);
59        Ok(())
60    }
61
62    #[inline]
63    async fn start(&mut self) -> Result<()> {
64        log::info!("{} start", self.name());
65        *self.scx.extends.auto_subscription_mut().await = Box::new(XAutoSubscription::new(self.cfg.clone()));
66        Ok(())
67    }
68
69    #[inline]
70    async fn stop(&mut self) -> Result<bool> {
71        log::info!("{} stop", self.name());
72        *self.scx.extends.auto_subscription_mut().await = Box::new(DefaultAutoSubscription);
73        Ok(false)
74    }
75}
76
77pub struct XAutoSubscription {
78    cfg: Arc<RwLock<PluginConfig>>,
79}
80
81impl XAutoSubscription {
82    #[inline]
83    pub(crate) fn new(cfg: Arc<RwLock<PluginConfig>>) -> XAutoSubscription {
84        Self { cfg }
85    }
86}
87
88#[async_trait]
89impl AutoSubscription for XAutoSubscription {
90    #[inline]
91    fn enable(&self) -> bool {
92        true
93    }
94
95    #[inline]
96    async fn subscribes(&self, id: &Id) -> Result<Vec<Subscribe>> {
97        let mut subs = Vec::new();
98        for item in self.cfg.read().await.subscribes.iter() {
99            let mut sub = item.sub.clone();
100            if item.has_clientid_placeholder {
101                sub.topic_filter = TopicFilter::from(sub.topic_filter.replace("${clientid}", &id.client_id));
102            }
103            if item.has_username_placeholder {
104                if let Some(username) = &id.username {
105                    sub.topic_filter = TopicFilter::from(sub.topic_filter.replace("${username}", username));
106                } else {
107                    log::warn!("{id} auto subscribe failed, username is not exist");
108                    continue;
109                }
110            }
111            subs.push(sub);
112        }
113        Ok(subs)
114    }
115}