1#![deny(unsafe_code)]
2
3use async_trait::async_trait;
4
5use rmqtt::{
6 codec::v3::ConnectAckReason as ConnectAckReasonV3,
7 codec::v5::ConnectAckReason as ConnectAckReasonV5,
8 context::ServerContext,
9 hook::Priority,
10 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
11 macros::Plugin,
12 plugin::{PackageInfo, Plugin},
13 register,
14 types::{ConnectAckReason, FromType},
15 Result,
16};
17
18register!(CounterPlugin::new);
19
20#[derive(Plugin)]
21struct CounterPlugin {
22 scx: ServerContext,
23 register: Box<dyn Register>,
24}
25
26impl CounterPlugin {
27 #[inline]
28 async fn new<S: Into<String>>(scx: ServerContext, _name: S) -> Result<Self> {
29 let register = scx.extends.hook_mgr().register();
30 Ok(Self { scx, register })
31 }
32}
33
34#[async_trait]
35impl Plugin for CounterPlugin {
36 #[inline]
37 async fn init(&mut self) -> Result<()> {
38 log::info!("{} init", self.name());
39 self.register
40 .add_priority(Type::ClientConnect, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
41 .await;
42 self.register
43 .add_priority(Type::ClientAuthenticate, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
44 .await;
45 self.register
46 .add_priority(Type::ClientConnack, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
47 .await;
48 self.register
49 .add_priority(Type::ClientConnected, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
50 .await;
51 self.register
52 .add_priority(Type::ClientDisconnected, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
53 .await;
54 self.register
55 .add_priority(Type::ClientSubscribe, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
56 .await;
57 self.register
58 .add_priority(Type::ClientUnsubscribe, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
59 .await;
60
61 self.register
62 .add_priority(
63 Type::ClientSubscribeCheckAcl,
64 Priority::MAX,
65 Box::new(CounterHandler::new(&self.scx)),
66 )
67 .await;
68 self.register
69 .add_priority(
70 Type::MessagePublishCheckAcl,
71 Priority::MAX,
72 Box::new(CounterHandler::new(&self.scx)),
73 )
74 .await;
75
76 self.register
77 .add_priority(Type::SessionCreated, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
78 .await;
79 self.register
80 .add_priority(Type::SessionTerminated, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
81 .await;
82 self.register
83 .add_priority(Type::SessionSubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
84 .await;
85 self.register
86 .add_priority(Type::SessionUnsubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
87 .await;
88
89 self.register
90 .add_priority(Type::MessagePublish, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
91 .await;
92 self.register
93 .add_priority(Type::MessageDelivered, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
94 .await;
95 self.register
96 .add_priority(Type::MessageAcked, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
97 .await;
98 self.register
99 .add_priority(Type::MessageDropped, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
100 .await;
101 self.register
102 .add_priority(Type::MessageNonsubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
103 .await;
104
105 Ok(())
106 }
107
108 #[inline]
109 async fn load_config(&mut self) -> Result<()> {
110 Ok(())
111 }
112
113 #[inline]
114 async fn start(&mut self) -> Result<()> {
115 log::info!("{} start", self.name());
116 self.register.start().await;
117 Ok(())
118 }
119
120 #[inline]
121 async fn stop(&mut self) -> Result<bool> {
122 log::warn!("{} stop, the Counter plug-in, it cannot be stopped", self.name());
123 Ok(false)
124 }
125}
126
127struct CounterHandler {
128 scx: ServerContext,
129}
130
131impl CounterHandler {
132 fn new(scx: &ServerContext) -> Self {
133 Self { scx: scx.clone() }
134 }
135}
136
137#[async_trait]
138impl Handler for CounterHandler {
139 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
140 match param {
141 Parameter::ClientConnect(connect_info) => {
142 self.scx.metrics.client_connect_inc();
143 if connect_info.username().is_none() {
144 self.scx.metrics.client_auth_anonymous_inc();
145 }
146 }
147 Parameter::ClientAuthenticate(_) => {
148 self.scx.metrics.client_authenticate_inc();
149 }
150 Parameter::ClientConnack(connect_info, reason) => {
151 self.scx.metrics.client_connack_inc();
152 match **reason {
153 ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
154 | ConnectAckReason::V5(ConnectAckReasonV5::Success) => {}
155 ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
156 | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
157 | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
158 | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword) => {
159 self.scx.metrics.client_connack_error_inc();
160 self.scx.metrics.client_connack_auth_error_inc();
161 if connect_info.username().is_none() {
162 self.scx.metrics.client_auth_anonymous_error_inc();
163 }
164 }
165 ConnectAckReason::V3(ConnectAckReasonV3::ServiceUnavailable)
166 | ConnectAckReason::V5(ConnectAckReasonV5::ServerUnavailable) => {
167 self.scx.metrics.client_connack_error_inc();
168 self.scx.metrics.client_connack_unavailable_error_inc();
169 }
170 _ => {
171 self.scx.metrics.client_connack_error_inc();
172 }
173 }
174 }
175 Parameter::ClientConnected(session) => {
176 self.scx.metrics.client_connected_inc();
177 if session.session_present().await.unwrap_or_default() {
178 self.scx.metrics.session_resumed_inc();
179 }
180 }
181 Parameter::ClientDisconnected(_session, _r) => {
182 self.scx.metrics.client_disconnected_inc();
183 }
184 Parameter::ClientSubscribeCheckAcl(_session, _s) => {
185 self.scx.metrics.client_subscribe_check_acl_inc();
186 }
187 Parameter::ClientSubscribe(_s, _sub) => {
188 self.scx.metrics.client_subscribe_inc();
189 }
190 Parameter::ClientUnsubscribe(_s, _unsub) => {
191 self.scx.metrics.client_unsubscribe_inc();
192 }
193
194 Parameter::SessionCreated(_session) => {
195 self.scx.metrics.session_created_inc();
196 }
197 Parameter::SessionTerminated(_session, _r) => {
198 self.scx.metrics.session_terminated_inc();
199 }
200 Parameter::SessionSubscribed(_s, _sub) => {
201 self.scx.metrics.session_subscribed_inc();
202 }
203 Parameter::SessionUnsubscribed(_s, _unsub) => {
204 self.scx.metrics.session_unsubscribed_inc();
205 }
206
207 Parameter::MessagePublishCheckAcl(_session, _p) => {
208 self.scx.metrics.client_publish_check_acl_inc();
209 }
210 Parameter::MessagePublish(_session, from, _p) => {
211 self.scx.metrics.messages_publish_inc();
218 match from.typ() {
219 FromType::Custom => self.scx.metrics.messages_publish_custom_inc(),
220 FromType::Admin => self.scx.metrics.messages_publish_admin_inc(),
221 FromType::System => self.scx.metrics.messages_publish_system_inc(),
222 FromType::LastWill => self.scx.metrics.messages_publish_lastwill_inc(),
223 FromType::Bridge => self.scx.metrics.messages_publish_bridge_inc(),
224 }
225 }
226 Parameter::MessageDelivered(_session, from, p) => {
227 self.scx.metrics.messages_delivered_inc();
228 if p.retain {
229 self.scx.metrics.messages_delivered_retain_inc()
230 }
231 match from.typ() {
232 FromType::Custom => self.scx.metrics.messages_delivered_custom_inc(),
233 FromType::Admin => self.scx.metrics.messages_delivered_admin_inc(),
234 FromType::System => self.scx.metrics.messages_delivered_system_inc(),
235 FromType::LastWill => self.scx.metrics.messages_delivered_lastwill_inc(),
236 FromType::Bridge => self.scx.metrics.messages_delivered_bridge_inc(),
237 }
238 }
239 Parameter::MessageAcked(_session, from, p) => {
240 self.scx.metrics.messages_acked_inc();
241 if p.retain {
242 self.scx.metrics.messages_acked_retain_inc()
243 }
244 match from.typ() {
245 FromType::Custom => self.scx.metrics.messages_acked_custom_inc(),
246 FromType::Admin => self.scx.metrics.messages_acked_admin_inc(),
247 FromType::System => self.scx.metrics.messages_acked_system_inc(),
248 FromType::LastWill => self.scx.metrics.messages_acked_lastwill_inc(),
249 FromType::Bridge => self.scx.metrics.messages_acked_bridge_inc(),
250 }
251 }
252 Parameter::MessageDropped(_to, _from, _p, _r) => {
253 self.scx.metrics.messages_dropped_inc(); }
255 Parameter::MessageNonsubscribed(from) => {
256 self.scx.metrics.messages_nonsubscribed_inc();
257 match from.typ() {
258 FromType::Custom => self.scx.metrics.messages_nonsubscribed_custom_inc(),
259 FromType::Admin => self.scx.metrics.messages_nonsubscribed_admin_inc(),
260 FromType::System => self.scx.metrics.messages_nonsubscribed_system_inc(),
261 FromType::LastWill => self.scx.metrics.messages_nonsubscribed_lastwill_inc(),
262 FromType::Bridge => self.scx.metrics.messages_nonsubscribed_bridge_inc(),
263 }
264 }
265
266 _ => {
267 log::error!("parameter is: {param:?}");
268 }
269 }
270 (true, acc)
271 }
272}