rmqtt_sys_topic/
lib.rs

1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde_json::{self, json};
11use tokio::{spawn, sync::RwLock, time::sleep};
12
13use rmqtt::{
14    codec::v5::PublishProperties,
15    context::ServerContext,
16    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
17    macros::Plugin,
18    plugin::{PackageInfo, Plugin},
19    register,
20    session::SessionState,
21    types::{ClientId, From, Id, NodeId, QoS, TopicName, UserName},
22    utils::timestamp_millis,
23    Result,
24};
25
26use config::PluginConfig;
27
28mod config;
29
30register!(SystemTopicPlugin::new);
31
32#[derive(Plugin)]
33struct SystemTopicPlugin {
34    scx: ServerContext,
35    register: Box<dyn Register>,
36    cfg: Arc<RwLock<PluginConfig>>,
37    running: Arc<AtomicBool>,
38}
39
40impl SystemTopicPlugin {
41    #[inline]
42    async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
43        let name = name.into();
44        let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
45        log::info!("{name} SystemTopicPlugin cfg: {cfg:?}");
46        let register = scx.extends.hook_mgr().register();
47        let cfg = Arc::new(RwLock::new(cfg));
48        let running = Arc::new(AtomicBool::new(false));
49        Ok(Self { scx, register, cfg, running })
50    }
51
52    fn start(scx: ServerContext, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
53        spawn(async move {
54            let min = Duration::from_secs(1);
55            loop {
56                let (publish_interval, publish_qos, expiry_interval) = {
57                    let cfg_rl = cfg.read().await;
58                    (cfg_rl.publish_interval, cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
59                };
60
61                let publish_interval = if publish_interval < min { min } else { publish_interval };
62                sleep(publish_interval).await;
63                if running.load(Ordering::SeqCst) {
64                    Self::send_stats(&scx, publish_qos, expiry_interval).await;
65                    Self::send_metrics(&scx, publish_qos, expiry_interval).await;
66                }
67            }
68        });
69    }
70
71    //Statistics
72    //$SYS/brokers/${node}/stats
73    async fn send_stats(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
74        let payload = scx.stats.clone(scx).await.to_json(scx).await;
75        let nodeid = scx.node.id();
76        let topic = format!("$SYS/brokers/{nodeid}/stats");
77        sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
78    }
79
80    //Metrics
81    //$SYS/brokers/${node}/metrics
82    async fn send_metrics(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
83        let payload = scx.metrics.to_json();
84        let nodeid = scx.node.id();
85        let topic = format!("$SYS/brokers/{nodeid}/metrics");
86        sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
87    }
88}
89
90#[async_trait]
91impl Plugin for SystemTopicPlugin {
92    #[inline]
93    async fn init(&mut self) -> Result<()> {
94        log::info!("{} init", self.name());
95        let cfg = &self.cfg;
96
97        self.register.add(Type::SessionCreated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
98        self.register.add(Type::SessionTerminated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
99        self.register.add(Type::ClientConnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
100        self.register.add(Type::ClientDisconnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
101        self.register.add(Type::SessionSubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
102        self.register.add(Type::SessionUnsubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
103
104        Self::start(self.scx.clone(), self.cfg.clone(), self.running.clone());
105        Ok(())
106    }
107
108    #[inline]
109    async fn get_config(&self) -> Result<serde_json::Value> {
110        self.cfg.read().await.to_json()
111    }
112
113    #[inline]
114    async fn load_config(&mut self) -> Result<()> {
115        let new_cfg = self.scx.plugins.read_config_default::<PluginConfig>(self.name())?;
116        *self.cfg.write().await = new_cfg;
117        log::debug!("load_config ok,  {:?}", self.cfg);
118        Ok(())
119    }
120
121    #[inline]
122    async fn start(&mut self) -> Result<()> {
123        log::info!("{} start", self.name());
124        self.register.start().await;
125        self.running.store(true, Ordering::SeqCst);
126        Ok(())
127    }
128
129    #[inline]
130    async fn stop(&mut self) -> Result<bool> {
131        log::info!("{} stop", self.name());
132        self.register.stop().await;
133        self.running.store(false, Ordering::SeqCst);
134        Ok(false)
135    }
136}
137
138struct SystemTopicHandler {
139    scx: ServerContext,
140    cfg: Arc<RwLock<PluginConfig>>,
141    nodeid: NodeId,
142}
143
144impl SystemTopicHandler {
145    fn new(scx: &ServerContext, cfg: &Arc<RwLock<PluginConfig>>) -> Self {
146        Self { scx: scx.clone(), cfg: cfg.clone(), nodeid: scx.node.id() }
147    }
148}
149
150#[async_trait]
151impl Handler for SystemTopicHandler {
152    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
153        log::debug!("param: {:?}, acc: {:?}", param, acc);
154        let now = chrono::Local::now();
155        let now_time = now.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
156        if let Some((topic, payload)) = match param {
157            Parameter::SessionCreated(session) => {
158                let body = json!({
159                    "node": session.id.node(),
160                    "ipaddress": session.id.remote_addr,
161                    "clientid": session.id.client_id,
162                    "username": session.id.username_ref(),
163                    "created_at": session.created_at().await.unwrap_or_default(),
164                    "time": now_time
165                });
166                let topic = format!("$SYS/brokers/{}/session/{}/created", self.nodeid, session.id.client_id);
167                Some((topic, body))
168            }
169
170            Parameter::SessionTerminated(session, reason) => {
171                let body = json!({
172                    "node": session.id.node(),
173                    "ipaddress": session.id.remote_addr,
174                    "clientid": session.id.client_id,
175                    "username": session.id.username_ref(),
176                    "reason": reason.to_string(),
177                    "time": now_time
178                });
179                let topic =
180                    format!("$SYS/brokers/{}/session/{}/terminated", self.nodeid, session.id.client_id);
181                Some((topic, body))
182            }
183            Parameter::ClientConnected(session) => {
184                let mut body = session
185                    .connect_info()
186                    .await
187                    .map(|connect_info| connect_info.to_hook_body())
188                    .unwrap_or_default();
189                if let Some(obj) = body.as_object_mut() {
190                    obj.insert(
191                        "connected_at".into(),
192                        serde_json::Value::Number(serde_json::Number::from(
193                            session.connected_at().await.unwrap_or_default(),
194                        )),
195                    );
196                    obj.insert(
197                        "session_present".into(),
198                        serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
199                    );
200                    obj.insert("time".into(), serde_json::Value::String(now_time));
201                }
202                let topic =
203                    format!("$SYS/brokers/{}/clients/{}/connected", self.nodeid, session.id.client_id);
204                Some((topic, body))
205            }
206            Parameter::ClientDisconnected(session, reason) => {
207                let body = json!({
208                    "node": session.id.node(),
209                    "ipaddress": session.id.remote_addr,
210                    "clientid": session.id.client_id,
211                    "username": session.id.username_ref(),
212                    "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
213                    "reason": reason.to_string(),
214                    "time": now_time
215                });
216                let topic =
217                    format!("$SYS/brokers/{}/clients/{}/disconnected", self.nodeid, session.id.client_id);
218                Some((topic, body))
219            }
220
221            Parameter::SessionSubscribed(session, subscribe) => {
222                let body = json!({
223                    "node": session.id.node(),
224                    "ipaddress": session.id.remote_addr,
225                    "clientid": session.id.client_id,
226                    "username": session.id.username_ref(),
227                    "topic": subscribe.topic_filter,
228                    "opts": subscribe.opts.to_json(),
229                    "time": now_time
230                });
231                let topic =
232                    format!("$SYS/brokers/{}/session/{}/subscribed", self.nodeid, session.id.client_id);
233                Some((topic, body))
234            }
235
236            Parameter::SessionUnsubscribed(session, unsubscribed) => {
237                let topic = unsubscribed.topic_filter.clone();
238                let body = json!({
239                    "node": session.id.node(),
240                    "ipaddress": session.id.remote_addr,
241                    "clientid": session.id.client_id,
242                    "username": session.id.username_ref(),
243                    "topic": topic,
244                    "time": now_time
245                });
246                let topic =
247                    format!("$SYS/brokers/{}/session/{}/unsubscribed", self.nodeid, session.id.client_id);
248                Some((topic, body))
249            }
250
251            _ => {
252                log::error!("unimplemented, {param:?}");
253                None
254            }
255        } {
256            let nodeid = self.nodeid;
257            let (publish_qos, expiry_interval) = {
258                let cfg_rl = self.cfg.read().await;
259                (cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
260            };
261
262            let scx = self.scx.clone();
263            spawn(sys_publish(scx, nodeid, topic, publish_qos, payload, expiry_interval));
264        }
265        (true, acc)
266    }
267}
268
269#[inline]
270async fn sys_publish(
271    scx: ServerContext,
272    nodeid: NodeId,
273    topic: String,
274    publish_qos: QoS,
275    payload: serde_json::Value,
276    message_expiry_interval: Duration,
277) {
278    match serde_json::to_string(&payload) {
279        Ok(payload) => {
280            let from = From::from_system(Id::new(
281                nodeid,
282                0,
283                None,
284                None,
285                ClientId::from_static("system"),
286                Some(UserName::from("system")),
287            ));
288
289            let p = Box::new(rmqtt::codec::types::Publish {
290                dup: false,
291                retain: false,
292                qos: publish_qos,
293                topic: TopicName::from(topic),
294                packet_id: None,
295                payload: Bytes::from(payload),
296                properties: Some(PublishProperties::default()),
297                delay_interval: None,
298                create_time: Some(timestamp_millis()),
299            });
300
301            //hook, message_publish
302            let p = scx.extends.hook_mgr().message_publish(None, from.clone(), &p).await.unwrap_or(p);
303
304            let storage_available = scx.extends.message_mgr().await.enable();
305
306            if let Err(e) =
307                SessionState::forwards(&scx, from, p, storage_available, Some(message_expiry_interval)).await
308            {
309                log::warn!("{e:?}");
310            }
311        }
312        Err(e) => {
313            log::error!("{e:?}");
314        }
315    }
316}