Skip to main content

rmqtt_sys_topic/
lib.rs

1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde_json::{self, json};
11use tokio::{spawn, sync::RwLock, time::sleep};
12
13use rmqtt::{
14    codec::v5::PublishProperties,
15    context::ServerContext,
16    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
17    macros::Plugin,
18    plugin::{PackageInfo, Plugin},
19    register,
20    session::SessionState,
21    types::{ClientId, From, Id, NodeId, QoS, TopicName, UserName},
22    utils::timestamp_millis,
23    Result,
24};
25
26use config::PluginConfig;
27use rmqtt::types::{CodecPublish, Publish};
28
29mod config;
30
31register!(SystemTopicPlugin::new);
32
33#[derive(Plugin)]
34struct SystemTopicPlugin {
35    scx: ServerContext,
36    register: Box<dyn Register>,
37    cfg: Arc<RwLock<PluginConfig>>,
38    running: Arc<AtomicBool>,
39}
40
41impl SystemTopicPlugin {
42    #[inline]
43    async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
44        let name = name.into();
45        let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
46        log::info!("{name} SystemTopicPlugin cfg: {cfg:?}");
47        let register = scx.extends.hook_mgr().register();
48        let cfg = Arc::new(RwLock::new(cfg));
49        let running = Arc::new(AtomicBool::new(false));
50        Ok(Self { scx, register, cfg, running })
51    }
52
53    fn start(scx: ServerContext, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
54        spawn(async move {
55            let min = Duration::from_secs(1);
56            loop {
57                let (publish_interval, publish_qos, expiry_interval) = {
58                    let cfg_rl = cfg.read().await;
59                    (cfg_rl.publish_interval, cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
60                };
61
62                let publish_interval = if publish_interval < min { min } else { publish_interval };
63                sleep(publish_interval).await;
64                if running.load(Ordering::SeqCst) {
65                    Self::send_stats(&scx, publish_qos, expiry_interval).await;
66                    Self::send_metrics(&scx, publish_qos, expiry_interval).await;
67                }
68            }
69        });
70    }
71
72    //Statistics
73    //$SYS/brokers/${node}/stats
74    async fn send_stats(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
75        let payload = scx.stats.clone(scx).await.to_json(scx).await;
76        let nodeid = scx.node.id();
77        let topic = format!("$SYS/brokers/{nodeid}/stats");
78        sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
79    }
80
81    //Metrics
82    //$SYS/brokers/${node}/metrics
83    async fn send_metrics(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
84        let payload = scx.metrics.to_json();
85        let nodeid = scx.node.id();
86        let topic = format!("$SYS/brokers/{nodeid}/metrics");
87        sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
88    }
89}
90
91#[async_trait]
92impl Plugin for SystemTopicPlugin {
93    #[inline]
94    async fn init(&mut self) -> Result<()> {
95        log::info!("{} init", self.name());
96        let cfg = &self.cfg;
97
98        self.register.add(Type::SessionCreated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
99        self.register.add(Type::SessionTerminated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
100        self.register.add(Type::ClientConnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
101        self.register.add(Type::ClientDisconnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
102        self.register.add(Type::SessionSubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
103        self.register.add(Type::SessionUnsubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
104
105        Self::start(self.scx.clone(), self.cfg.clone(), self.running.clone());
106        Ok(())
107    }
108
109    #[inline]
110    async fn get_config(&self) -> Result<serde_json::Value> {
111        self.cfg.read().await.to_json()
112    }
113
114    #[inline]
115    async fn load_config(&mut self) -> Result<()> {
116        let new_cfg = self.scx.plugins.read_config_default::<PluginConfig>(self.name())?;
117        *self.cfg.write().await = new_cfg;
118        log::debug!("load_config ok,  {:?}", self.cfg);
119        Ok(())
120    }
121
122    #[inline]
123    async fn start(&mut self) -> Result<()> {
124        log::info!("{} start", self.name());
125        self.register.start().await;
126        self.running.store(true, Ordering::SeqCst);
127        Ok(())
128    }
129
130    #[inline]
131    async fn stop(&mut self) -> Result<bool> {
132        log::info!("{} stop", self.name());
133        self.register.stop().await;
134        self.running.store(false, Ordering::SeqCst);
135        Ok(false)
136    }
137}
138
139struct SystemTopicHandler {
140    scx: ServerContext,
141    cfg: Arc<RwLock<PluginConfig>>,
142    nodeid: NodeId,
143}
144
145impl SystemTopicHandler {
146    fn new(scx: &ServerContext, cfg: &Arc<RwLock<PluginConfig>>) -> Self {
147        Self { scx: scx.clone(), cfg: cfg.clone(), nodeid: scx.node.id() }
148    }
149}
150
151#[async_trait]
152impl Handler for SystemTopicHandler {
153    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
154        log::debug!("param: {param:?}, acc: {acc:?}");
155        let now = chrono::Local::now();
156        let now_time = now.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
157        if let Some((topic, payload)) = match param {
158            Parameter::SessionCreated(session) => {
159                let body = json!({
160                    "node": session.id.node(),
161                    "ipaddress": session.id.remote_addr,
162                    "clientid": session.id.client_id,
163                    "username": session.id.username_ref(),
164                    "created_at": session.created_at().await.unwrap_or_default(),
165                    "time": now_time
166                });
167                let topic = format!("$SYS/brokers/{}/session/{}/created", self.nodeid, session.id.client_id);
168                Some((topic, body))
169            }
170
171            Parameter::SessionTerminated(session, reason) => {
172                let body = json!({
173                    "node": session.id.node(),
174                    "ipaddress": session.id.remote_addr,
175                    "clientid": session.id.client_id,
176                    "username": session.id.username_ref(),
177                    "reason": reason.to_string(),
178                    "time": now_time
179                });
180                let topic =
181                    format!("$SYS/brokers/{}/session/{}/terminated", self.nodeid, session.id.client_id);
182                Some((topic, body))
183            }
184            Parameter::ClientConnected(session) => {
185                let mut body = session
186                    .connect_info()
187                    .await
188                    .map(|connect_info| connect_info.to_hook_body(true))
189                    .unwrap_or_default();
190                if let Some(obj) = body.as_object_mut() {
191                    obj.insert(
192                        "connected_at".into(),
193                        serde_json::Value::Number(serde_json::Number::from(
194                            session.connected_at().await.unwrap_or_default(),
195                        )),
196                    );
197                    obj.insert(
198                        "session_present".into(),
199                        serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
200                    );
201                    obj.insert("time".into(), serde_json::Value::String(now_time));
202                }
203                let topic =
204                    format!("$SYS/brokers/{}/clients/{}/connected", self.nodeid, session.id.client_id);
205                Some((topic, body))
206            }
207            Parameter::ClientDisconnected(session, reason) => {
208                let body = json!({
209                    "node": session.id.node(),
210                    "ipaddress": session.id.remote_addr,
211                    "clientid": session.id.client_id,
212                    "username": session.id.username_ref(),
213                    "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
214                    "reason": reason.to_string(),
215                    "time": now_time
216                });
217                let topic =
218                    format!("$SYS/brokers/{}/clients/{}/disconnected", self.nodeid, session.id.client_id);
219                Some((topic, body))
220            }
221
222            Parameter::SessionSubscribed(session, subscribe) => {
223                let body = json!({
224                    "node": session.id.node(),
225                    "ipaddress": session.id.remote_addr,
226                    "clientid": session.id.client_id,
227                    "username": session.id.username_ref(),
228                    "topic": subscribe.topic_filter,
229                    "opts": subscribe.opts.to_json(),
230                    "time": now_time
231                });
232                let topic =
233                    format!("$SYS/brokers/{}/session/{}/subscribed", self.nodeid, session.id.client_id);
234                Some((topic, body))
235            }
236
237            Parameter::SessionUnsubscribed(session, unsubscribed) => {
238                let topic = unsubscribed.topic_filter.clone();
239                let body = json!({
240                    "node": session.id.node(),
241                    "ipaddress": session.id.remote_addr,
242                    "clientid": session.id.client_id,
243                    "username": session.id.username_ref(),
244                    "topic": topic,
245                    "time": now_time
246                });
247                let topic =
248                    format!("$SYS/brokers/{}/session/{}/unsubscribed", self.nodeid, session.id.client_id);
249                Some((topic, body))
250            }
251
252            _ => {
253                log::error!("unimplemented, {param:?}");
254                None
255            }
256        } {
257            let nodeid = self.nodeid;
258            let (publish_qos, expiry_interval) = {
259                let cfg_rl = self.cfg.read().await;
260                (cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
261            };
262
263            let scx = self.scx.clone();
264            spawn(sys_publish(scx, nodeid, topic, publish_qos, payload, expiry_interval));
265        }
266        (true, acc)
267    }
268}
269
270#[inline]
271async fn sys_publish(
272    scx: ServerContext,
273    nodeid: NodeId,
274    topic: String,
275    publish_qos: QoS,
276    payload: serde_json::Value,
277    message_expiry_interval: Duration,
278) {
279    match serde_json::to_string(&payload) {
280        Ok(payload) => {
281            let from = From::from_system(Id::new(
282                nodeid,
283                0,
284                None,
285                None,
286                ClientId::from_static("system"),
287                Some(UserName::from("system")),
288            ));
289
290            let p = CodecPublish {
291                dup: false,
292                retain: false,
293                qos: publish_qos,
294                topic: TopicName::from(topic),
295                packet_id: None,
296                payload: Bytes::from(payload),
297                properties: Some(PublishProperties::default()),
298            };
299            let p = <CodecPublish as Into<Publish>>::into(p).create_time(timestamp_millis());
300
301            //hook, message_publish
302            let p = scx.extends.hook_mgr().message_publish(None, from.clone(), &p).await.unwrap_or(p);
303
304            let storage_available = scx.extends.message_mgr().await.enable();
305
306            if let Err(e) =
307                SessionState::forwards(&scx, from, p, storage_available, Some(message_expiry_interval)).await
308            {
309                log::warn!("{e:?}");
310            }
311        }
312        Err(e) => {
313            log::error!("{e:?}");
314        }
315    }
316}