1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde_json::{self, json};
11use tokio::{spawn, sync::RwLock, time::sleep};
12
13use rmqtt::{
14 codec::v5::PublishProperties,
15 context::ServerContext,
16 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
17 macros::Plugin,
18 plugin::{PackageInfo, Plugin},
19 register,
20 session::SessionState,
21 types::{ClientId, From, Id, NodeId, QoS, TopicName, UserName},
22 utils::timestamp_millis,
23 Result,
24};
25
26use config::PluginConfig;
27
28mod config;
29
30register!(SystemTopicPlugin::new);
31
32#[derive(Plugin)]
33struct SystemTopicPlugin {
34 scx: ServerContext,
35 register: Box<dyn Register>,
36 cfg: Arc<RwLock<PluginConfig>>,
37 running: Arc<AtomicBool>,
38}
39
40impl SystemTopicPlugin {
41 #[inline]
42 async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
43 let name = name.into();
44 let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
45 log::debug!("{name} SystemTopicPlugin cfg: {cfg:?}");
46 let register = scx.extends.hook_mgr().register();
47 let cfg = Arc::new(RwLock::new(cfg));
48 let running = Arc::new(AtomicBool::new(false));
49 Ok(Self { scx, register, cfg, running })
50 }
51
52 fn start(scx: ServerContext, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
53 spawn(async move {
54 let min = Duration::from_secs(1);
55 loop {
56 let (publish_interval, publish_qos, expiry_interval) = {
57 let cfg_rl = cfg.read().await;
58 (cfg_rl.publish_interval, cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
59 };
60
61 let publish_interval = if publish_interval < min { min } else { publish_interval };
62 sleep(publish_interval).await;
63 if running.load(Ordering::SeqCst) {
64 Self::send_stats(&scx, publish_qos, expiry_interval).await;
65 Self::send_metrics(&scx, publish_qos, expiry_interval).await;
66 }
67 }
68 });
69 }
70
71 async fn send_stats(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
74 let payload = scx.stats.clone(scx).await.to_json(scx).await;
75 let nodeid = scx.node.id();
76 let topic = format!("$SYS/brokers/{nodeid}/stats");
77 sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
78 }
79
80 async fn send_metrics(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
83 let payload = scx.metrics.to_json();
84 let nodeid = scx.node.id();
85 let topic = format!("$SYS/brokers/{nodeid}/metrics");
86 sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
87 }
88}
89
90#[async_trait]
91impl Plugin for SystemTopicPlugin {
92 #[inline]
93 async fn init(&mut self) -> Result<()> {
94 log::info!("{} init", self.name());
95 let cfg = &self.cfg;
96
97 self.register.add(Type::SessionCreated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
98 self.register.add(Type::SessionTerminated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
99 self.register.add(Type::ClientConnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
100 self.register.add(Type::ClientDisconnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
101 self.register.add(Type::SessionSubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
102 self.register.add(Type::SessionUnsubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
103
104 Self::start(self.scx.clone(), self.cfg.clone(), self.running.clone());
105 Ok(())
106 }
107
108 #[inline]
109 async fn get_config(&self) -> Result<serde_json::Value> {
110 self.cfg.read().await.to_json()
111 }
112
113 #[inline]
114 async fn load_config(&mut self) -> Result<()> {
115 let new_cfg = self.scx.plugins.read_config_default::<PluginConfig>(self.name())?;
116 *self.cfg.write().await = new_cfg;
117 log::debug!("load_config ok, {:?}", self.cfg);
118 Ok(())
119 }
120
121 #[inline]
122 async fn start(&mut self) -> Result<()> {
123 log::info!("{} start", self.name());
124 self.register.start().await;
125 self.running.store(true, Ordering::SeqCst);
126 Ok(())
127 }
128
129 #[inline]
130 async fn stop(&mut self) -> Result<bool> {
131 log::info!("{} stop", self.name());
132 self.register.stop().await;
133 self.running.store(false, Ordering::SeqCst);
134 Ok(false)
135 }
136}
137
138struct SystemTopicHandler {
139 scx: ServerContext,
140 cfg: Arc<RwLock<PluginConfig>>,
141 nodeid: NodeId,
142}
143
144impl SystemTopicHandler {
145 fn new(scx: &ServerContext, cfg: &Arc<RwLock<PluginConfig>>) -> Self {
146 Self { scx: scx.clone(), cfg: cfg.clone(), nodeid: scx.node.id() }
147 }
148}
149
150#[async_trait]
151impl Handler for SystemTopicHandler {
152 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
153 log::debug!("param: {:?}, acc: {:?}", param, acc);
154 let now = chrono::Local::now();
155 let now_time = now.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
156 if let Some((topic, payload)) = match param {
157 Parameter::SessionCreated(session) => {
158 let body = json!({
159 "node": session.id.node(),
160 "ipaddress": session.id.remote_addr,
161 "clientid": session.id.client_id,
162 "username": session.id.username_ref(),
163 "created_at": session.created_at().await.unwrap_or_default(),
164 "time": now_time
165 });
166 let topic = format!("$SYS/brokers/{}/session/{}/created", self.nodeid, session.id.client_id);
167 Some((topic, body))
168 }
169
170 Parameter::SessionTerminated(session, reason) => {
171 let body = json!({
172 "node": session.id.node(),
173 "ipaddress": session.id.remote_addr,
174 "clientid": session.id.client_id,
175 "username": session.id.username_ref(),
176 "reason": reason.to_string(),
177 "time": now_time
178 });
179 let topic =
180 format!("$SYS/brokers/{}/session/{}/terminated", self.nodeid, session.id.client_id);
181 Some((topic, body))
182 }
183 Parameter::ClientConnected(session) => {
184 let mut body = session
185 .connect_info()
186 .await
187 .map(|connect_info| connect_info.to_hook_body())
188 .unwrap_or_default();
189 if let Some(obj) = body.as_object_mut() {
190 obj.insert(
191 "connected_at".into(),
192 serde_json::Value::Number(serde_json::Number::from(
193 session.connected_at().await.unwrap_or_default(),
194 )),
195 );
196 obj.insert(
197 "session_present".into(),
198 serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
199 );
200 obj.insert("time".into(), serde_json::Value::String(now_time));
201 }
202 let topic =
203 format!("$SYS/brokers/{}/clients/{}/connected", self.nodeid, session.id.client_id);
204 Some((topic, body))
205 }
206 Parameter::ClientDisconnected(session, reason) => {
207 let body = json!({
208 "node": session.id.node(),
209 "ipaddress": session.id.remote_addr,
210 "clientid": session.id.client_id,
211 "username": session.id.username_ref(),
212 "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
213 "reason": reason.to_string(),
214 "time": now_time
215 });
216 let topic =
217 format!("$SYS/brokers/{}/clients/{}/disconnected", self.nodeid, session.id.client_id);
218 Some((topic, body))
219 }
220
221 Parameter::SessionSubscribed(session, subscribe) => {
222 let body = json!({
223 "node": session.id.node(),
224 "ipaddress": session.id.remote_addr,
225 "clientid": session.id.client_id,
226 "username": session.id.username_ref(),
227 "topic": subscribe.topic_filter,
228 "opts": subscribe.opts.to_json(),
229 "time": now_time
230 });
231 let topic =
232 format!("$SYS/brokers/{}/session/{}/subscribed", self.nodeid, session.id.client_id);
233 Some((topic, body))
234 }
235
236 Parameter::SessionUnsubscribed(session, unsubscribed) => {
237 let topic = unsubscribed.topic_filter.clone();
238 let body = json!({
239 "node": session.id.node(),
240 "ipaddress": session.id.remote_addr,
241 "clientid": session.id.client_id,
242 "username": session.id.username_ref(),
243 "topic": topic,
244 "time": now_time
245 });
246 let topic =
247 format!("$SYS/brokers/{}/session/{}/unsubscribed", self.nodeid, session.id.client_id);
248 Some((topic, body))
249 }
250
251 _ => {
252 log::error!("unimplemented, {param:?}");
253 None
254 }
255 } {
256 let nodeid = self.nodeid;
257 let (publish_qos, expiry_interval) = {
258 let cfg_rl = self.cfg.read().await;
259 (cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
260 };
261
262 let scx = self.scx.clone();
263 spawn(sys_publish(scx, nodeid, topic, publish_qos, payload, expiry_interval));
264 }
265 (true, acc)
266 }
267}
268
269#[inline]
270async fn sys_publish(
271 scx: ServerContext,
272 nodeid: NodeId,
273 topic: String,
274 publish_qos: QoS,
275 payload: serde_json::Value,
276 message_expiry_interval: Duration,
277) {
278 match serde_json::to_string(&payload) {
279 Ok(payload) => {
280 let from = From::from_system(Id::new(
281 nodeid,
282 0,
283 None,
284 None,
285 ClientId::from_static("system"),
286 Some(UserName::from("system")),
287 ));
288
289 let p = Box::new(rmqtt::codec::types::Publish {
290 dup: false,
291 retain: false,
292 qos: publish_qos,
293 topic: TopicName::from(topic),
294 packet_id: None,
295 payload: Bytes::from(payload),
296 properties: Some(PublishProperties::default()),
297 delay_interval: None,
298 create_time: Some(timestamp_millis()),
299 });
300
301 let p = scx.extends.hook_mgr().message_publish(None, from.clone(), &p).await.unwrap_or(p);
303
304 let storage_available = scx.extends.message_mgr().await.enable();
305
306 if let Err(e) =
307 SessionState::forwards(&scx, from, p, storage_available, Some(message_expiry_interval)).await
308 {
309 log::warn!("{e:?}");
310 }
311 }
312 Err(e) => {
313 log::error!("{e:?}");
314 }
315 }
316}