1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde_json::{self, json};
11use tokio::{spawn, sync::RwLock, time::sleep};
12
13use rmqtt::{
14 codec::v5::PublishProperties,
15 context::ServerContext,
16 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
17 macros::Plugin,
18 plugin::{PackageInfo, Plugin},
19 register,
20 session::SessionState,
21 types::{ClientId, From, Id, NodeId, QoS, TopicName, UserName},
22 utils::timestamp_millis,
23 Result,
24};
25
26use config::PluginConfig;
27use rmqtt::types::{CodecPublish, Publish};
28
29mod config;
30
31register!(SystemTopicPlugin::new);
32
33#[derive(Plugin)]
34struct SystemTopicPlugin {
35 scx: ServerContext,
36 register: Box<dyn Register>,
37 cfg: Arc<RwLock<PluginConfig>>,
38 running: Arc<AtomicBool>,
39}
40
41impl SystemTopicPlugin {
42 #[inline]
43 async fn new<N: Into<String>>(scx: ServerContext, name: N) -> Result<Self> {
44 let name = name.into();
45 let cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
46 log::info!("{name} SystemTopicPlugin cfg: {cfg:?}");
47 let register = scx.extends.hook_mgr().register();
48 let cfg = Arc::new(RwLock::new(cfg));
49 let running = Arc::new(AtomicBool::new(false));
50 Ok(Self { scx, register, cfg, running })
51 }
52
53 fn start(scx: ServerContext, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
54 spawn(async move {
55 let min = Duration::from_secs(1);
56 loop {
57 let (publish_interval, publish_qos, expiry_interval) = {
58 let cfg_rl = cfg.read().await;
59 (cfg_rl.publish_interval, cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
60 };
61
62 let publish_interval = if publish_interval < min { min } else { publish_interval };
63 sleep(publish_interval).await;
64 if running.load(Ordering::SeqCst) {
65 Self::send_stats(&scx, publish_qos, expiry_interval).await;
66 Self::send_metrics(&scx, publish_qos, expiry_interval).await;
67 }
68 }
69 });
70 }
71
72 async fn send_stats(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
75 let payload = scx.stats.clone(scx).await.to_json(scx).await;
76 let nodeid = scx.node.id();
77 let topic = format!("$SYS/brokers/{nodeid}/stats");
78 sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
79 }
80
81 async fn send_metrics(scx: &ServerContext, publish_qos: QoS, expiry_interval: Duration) {
84 let payload = scx.metrics.to_json();
85 let nodeid = scx.node.id();
86 let topic = format!("$SYS/brokers/{nodeid}/metrics");
87 sys_publish(scx.clone(), nodeid, topic, publish_qos, payload, expiry_interval).await;
88 }
89}
90
91#[async_trait]
92impl Plugin for SystemTopicPlugin {
93 #[inline]
94 async fn init(&mut self) -> Result<()> {
95 log::info!("{} init", self.name());
96 let cfg = &self.cfg;
97
98 self.register.add(Type::SessionCreated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
99 self.register.add(Type::SessionTerminated, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
100 self.register.add(Type::ClientConnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
101 self.register.add(Type::ClientDisconnected, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
102 self.register.add(Type::SessionSubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
103 self.register.add(Type::SessionUnsubscribed, Box::new(SystemTopicHandler::new(&self.scx, cfg))).await;
104
105 Self::start(self.scx.clone(), self.cfg.clone(), self.running.clone());
106 Ok(())
107 }
108
109 #[inline]
110 async fn get_config(&self) -> Result<serde_json::Value> {
111 self.cfg.read().await.to_json()
112 }
113
114 #[inline]
115 async fn load_config(&mut self) -> Result<()> {
116 let new_cfg = self.scx.plugins.read_config_default::<PluginConfig>(self.name())?;
117 *self.cfg.write().await = new_cfg;
118 log::debug!("load_config ok, {:?}", self.cfg);
119 Ok(())
120 }
121
122 #[inline]
123 async fn start(&mut self) -> Result<()> {
124 log::info!("{} start", self.name());
125 self.register.start().await;
126 self.running.store(true, Ordering::SeqCst);
127 Ok(())
128 }
129
130 #[inline]
131 async fn stop(&mut self) -> Result<bool> {
132 log::info!("{} stop", self.name());
133 self.register.stop().await;
134 self.running.store(false, Ordering::SeqCst);
135 Ok(false)
136 }
137}
138
139struct SystemTopicHandler {
140 scx: ServerContext,
141 cfg: Arc<RwLock<PluginConfig>>,
142 nodeid: NodeId,
143}
144
145impl SystemTopicHandler {
146 fn new(scx: &ServerContext, cfg: &Arc<RwLock<PluginConfig>>) -> Self {
147 Self { scx: scx.clone(), cfg: cfg.clone(), nodeid: scx.node.id() }
148 }
149}
150
151#[async_trait]
152impl Handler for SystemTopicHandler {
153 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
154 log::debug!("param: {param:?}, acc: {acc:?}");
155 let now = chrono::Local::now();
156 let now_time = now.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
157 if let Some((topic, payload)) = match param {
158 Parameter::SessionCreated(session) => {
159 let body = json!({
160 "node": session.id.node(),
161 "ipaddress": session.id.remote_addr,
162 "clientid": session.id.client_id,
163 "username": session.id.username_ref(),
164 "created_at": session.created_at().await.unwrap_or_default(),
165 "time": now_time
166 });
167 let topic = format!("$SYS/brokers/{}/session/{}/created", self.nodeid, session.id.client_id);
168 Some((topic, body))
169 }
170
171 Parameter::SessionTerminated(session, reason) => {
172 let body = json!({
173 "node": session.id.node(),
174 "ipaddress": session.id.remote_addr,
175 "clientid": session.id.client_id,
176 "username": session.id.username_ref(),
177 "reason": reason.to_string(),
178 "time": now_time
179 });
180 let topic =
181 format!("$SYS/brokers/{}/session/{}/terminated", self.nodeid, session.id.client_id);
182 Some((topic, body))
183 }
184 Parameter::ClientConnected(session) => {
185 let mut body = session
186 .connect_info()
187 .await
188 .map(|connect_info| connect_info.to_hook_body())
189 .unwrap_or_default();
190 if let Some(obj) = body.as_object_mut() {
191 obj.insert(
192 "connected_at".into(),
193 serde_json::Value::Number(serde_json::Number::from(
194 session.connected_at().await.unwrap_or_default(),
195 )),
196 );
197 obj.insert(
198 "session_present".into(),
199 serde_json::Value::Bool(session.session_present().await.unwrap_or_default()),
200 );
201 obj.insert("time".into(), serde_json::Value::String(now_time));
202 }
203 let topic =
204 format!("$SYS/brokers/{}/clients/{}/connected", self.nodeid, session.id.client_id);
205 Some((topic, body))
206 }
207 Parameter::ClientDisconnected(session, reason) => {
208 let body = json!({
209 "node": session.id.node(),
210 "ipaddress": session.id.remote_addr,
211 "clientid": session.id.client_id,
212 "username": session.id.username_ref(),
213 "disconnected_at": session.disconnected_at().await.unwrap_or_default(),
214 "reason": reason.to_string(),
215 "time": now_time
216 });
217 let topic =
218 format!("$SYS/brokers/{}/clients/{}/disconnected", self.nodeid, session.id.client_id);
219 Some((topic, body))
220 }
221
222 Parameter::SessionSubscribed(session, subscribe) => {
223 let body = json!({
224 "node": session.id.node(),
225 "ipaddress": session.id.remote_addr,
226 "clientid": session.id.client_id,
227 "username": session.id.username_ref(),
228 "topic": subscribe.topic_filter,
229 "opts": subscribe.opts.to_json(),
230 "time": now_time
231 });
232 let topic =
233 format!("$SYS/brokers/{}/session/{}/subscribed", self.nodeid, session.id.client_id);
234 Some((topic, body))
235 }
236
237 Parameter::SessionUnsubscribed(session, unsubscribed) => {
238 let topic = unsubscribed.topic_filter.clone();
239 let body = json!({
240 "node": session.id.node(),
241 "ipaddress": session.id.remote_addr,
242 "clientid": session.id.client_id,
243 "username": session.id.username_ref(),
244 "topic": topic,
245 "time": now_time
246 });
247 let topic =
248 format!("$SYS/brokers/{}/session/{}/unsubscribed", self.nodeid, session.id.client_id);
249 Some((topic, body))
250 }
251
252 _ => {
253 log::error!("unimplemented, {param:?}");
254 None
255 }
256 } {
257 let nodeid = self.nodeid;
258 let (publish_qos, expiry_interval) = {
259 let cfg_rl = self.cfg.read().await;
260 (cfg_rl.publish_qos, cfg_rl.message_expiry_interval)
261 };
262
263 let scx = self.scx.clone();
264 spawn(sys_publish(scx, nodeid, topic, publish_qos, payload, expiry_interval));
265 }
266 (true, acc)
267 }
268}
269
270#[inline]
271async fn sys_publish(
272 scx: ServerContext,
273 nodeid: NodeId,
274 topic: String,
275 publish_qos: QoS,
276 payload: serde_json::Value,
277 message_expiry_interval: Duration,
278) {
279 match serde_json::to_string(&payload) {
280 Ok(payload) => {
281 let from = From::from_system(Id::new(
282 nodeid,
283 0,
284 None,
285 None,
286 ClientId::from_static("system"),
287 Some(UserName::from("system")),
288 ));
289
290 let p = CodecPublish {
291 dup: false,
292 retain: false,
293 qos: publish_qos,
294 topic: TopicName::from(topic),
295 packet_id: None,
296 payload: Bytes::from(payload),
297 properties: Some(PublishProperties::default()),
298 };
299 let p = <CodecPublish as Into<Publish>>::into(p).create_time(timestamp_millis());
300
301 let p = scx.extends.hook_mgr().message_publish(None, from.clone(), &p).await.unwrap_or(p);
303
304 let storage_available = scx.extends.message_mgr().await.enable();
305
306 if let Err(e) =
307 SessionState::forwards(&scx, from, p, storage_available, Some(message_expiry_interval)).await
308 {
309 log::warn!("{e:?}");
310 }
311 }
312 Err(e) => {
313 log::error!("{e:?}");
314 }
315 }
316}