rmqtt_counter/
lib.rs

1#![deny(unsafe_code)]
2
3use async_trait::async_trait;
4
5use rmqtt::{
6    codec::v3::ConnectAckReason as ConnectAckReasonV3,
7    codec::v5::ConnectAckReason as ConnectAckReasonV5,
8    context::ServerContext,
9    hook::Priority,
10    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
11    macros::Plugin,
12    plugin::{PackageInfo, Plugin},
13    register,
14    types::{ConnectAckReason, FromType},
15    Result,
16};
17
18register!(CounterPlugin::new);
19
20#[derive(Plugin)]
21struct CounterPlugin {
22    scx: ServerContext,
23    register: Box<dyn Register>,
24}
25
26impl CounterPlugin {
27    #[inline]
28    async fn new<S: Into<String>>(scx: ServerContext, _name: S) -> Result<Self> {
29        let register = scx.extends.hook_mgr().register();
30        Ok(Self { scx, register })
31    }
32}
33
34#[async_trait]
35impl Plugin for CounterPlugin {
36    #[inline]
37    async fn init(&mut self) -> Result<()> {
38        log::info!("{} init", self.name());
39        self.register
40            .add_priority(Type::ClientConnect, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
41            .await;
42        self.register
43            .add_priority(Type::ClientAuthenticate, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
44            .await;
45        self.register
46            .add_priority(Type::ClientConnack, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
47            .await;
48        self.register
49            .add_priority(Type::ClientConnected, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
50            .await;
51        self.register
52            .add_priority(Type::ClientDisconnected, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
53            .await;
54        self.register
55            .add_priority(Type::ClientSubscribe, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
56            .await;
57        self.register
58            .add_priority(Type::ClientUnsubscribe, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
59            .await;
60
61        self.register
62            .add_priority(
63                Type::ClientSubscribeCheckAcl,
64                Priority::MAX,
65                Box::new(CounterHandler::new(&self.scx)),
66            )
67            .await;
68        self.register
69            .add_priority(
70                Type::MessagePublishCheckAcl,
71                Priority::MAX,
72                Box::new(CounterHandler::new(&self.scx)),
73            )
74            .await;
75
76        self.register
77            .add_priority(Type::SessionCreated, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
78            .await;
79        self.register
80            .add_priority(Type::SessionTerminated, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
81            .await;
82        self.register
83            .add_priority(Type::SessionSubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
84            .await;
85        self.register
86            .add_priority(Type::SessionUnsubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
87            .await;
88
89        self.register
90            .add_priority(Type::MessagePublish, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
91            .await;
92        self.register
93            .add_priority(Type::MessageDelivered, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
94            .await;
95        self.register
96            .add_priority(Type::MessageAcked, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
97            .await;
98        self.register
99            .add_priority(Type::MessageDropped, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
100            .await;
101        self.register
102            .add_priority(Type::MessageNonsubscribed, Priority::MAX, Box::new(CounterHandler::new(&self.scx)))
103            .await;
104
105        Ok(())
106    }
107
108    #[inline]
109    async fn load_config(&mut self) -> Result<()> {
110        Ok(())
111    }
112
113    #[inline]
114    async fn start(&mut self) -> Result<()> {
115        log::info!("{} start", self.name());
116        self.register.start().await;
117        Ok(())
118    }
119
120    #[inline]
121    async fn stop(&mut self) -> Result<bool> {
122        log::warn!("{} stop, the Counter plug-in, it cannot be stopped", self.name());
123        Ok(false)
124    }
125}
126
127struct CounterHandler {
128    scx: ServerContext,
129}
130
131impl CounterHandler {
132    fn new(scx: &ServerContext) -> Self {
133        Self { scx: scx.clone() }
134    }
135}
136
137#[async_trait]
138impl Handler for CounterHandler {
139    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
140        match param {
141            Parameter::ClientConnect(connect_info) => {
142                self.scx.metrics.client_connect_inc();
143                if connect_info.username().is_none() {
144                    self.scx.metrics.client_auth_anonymous_inc();
145                }
146            }
147            Parameter::ClientAuthenticate(_) => {
148                self.scx.metrics.client_authenticate_inc();
149            }
150            Parameter::ClientConnack(connect_info, reason) => {
151                self.scx.metrics.client_connack_inc();
152                match **reason {
153                    ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
154                    | ConnectAckReason::V5(ConnectAckReasonV5::Success) => {}
155                    ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
156                    | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
157                    | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
158                    | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword) => {
159                        self.scx.metrics.client_connack_error_inc();
160                        self.scx.metrics.client_connack_auth_error_inc();
161                        if connect_info.username().is_none() {
162                            self.scx.metrics.client_auth_anonymous_error_inc();
163                        }
164                    }
165                    ConnectAckReason::V3(ConnectAckReasonV3::ServiceUnavailable)
166                    | ConnectAckReason::V5(ConnectAckReasonV5::ServerUnavailable) => {
167                        self.scx.metrics.client_connack_error_inc();
168                        self.scx.metrics.client_connack_unavailable_error_inc();
169                    }
170                    _ => {
171                        self.scx.metrics.client_connack_error_inc();
172                    }
173                }
174            }
175            Parameter::ClientConnected(session) => {
176                self.scx.metrics.client_connected_inc();
177                if session.session_present().await.unwrap_or_default() {
178                    self.scx.metrics.session_resumed_inc();
179                }
180            }
181            Parameter::ClientDisconnected(_session, _r) => {
182                self.scx.metrics.client_disconnected_inc();
183            }
184            Parameter::ClientSubscribeCheckAcl(_session, _s) => {
185                self.scx.metrics.client_subscribe_check_acl_inc();
186            }
187            Parameter::ClientSubscribe(_s, _sub) => {
188                self.scx.metrics.client_subscribe_inc();
189            }
190            Parameter::ClientUnsubscribe(_s, _unsub) => {
191                self.scx.metrics.client_unsubscribe_inc();
192            }
193
194            Parameter::SessionCreated(_session) => {
195                self.scx.metrics.session_created_inc();
196            }
197            Parameter::SessionTerminated(_session, _r) => {
198                self.scx.metrics.session_terminated_inc();
199            }
200            Parameter::SessionSubscribed(_s, _sub) => {
201                self.scx.metrics.session_subscribed_inc();
202            }
203            Parameter::SessionUnsubscribed(_s, _unsub) => {
204                self.scx.metrics.session_unsubscribed_inc();
205            }
206
207            Parameter::MessagePublishCheckAcl(_session, _p) => {
208                self.scx.metrics.client_publish_check_acl_inc();
209            }
210            Parameter::MessagePublish(_session, from, _p) => {
211                // self.scx.metrics.messages_received_inc();  //@TODO ... elaboration
212                // match p.qos{
213                //     QoS::AtMostOnce => self.scx.metrics.messages_received_qos0_inc(),
214                //     QoS::AtLeastOnce => self.scx.metrics.messages_received_qos1_inc(),
215                //     QoS::ExactlyOnce => self.scx.metrics.messages_received_qos2_inc(),
216                // }
217                self.scx.metrics.messages_publish_inc();
218                match from.typ() {
219                    FromType::Custom => self.scx.metrics.messages_publish_custom_inc(),
220                    FromType::Admin => self.scx.metrics.messages_publish_admin_inc(),
221                    FromType::System => self.scx.metrics.messages_publish_system_inc(),
222                    FromType::LastWill => self.scx.metrics.messages_publish_lastwill_inc(),
223                    FromType::Bridge => self.scx.metrics.messages_publish_bridge_inc(),
224                }
225            }
226            Parameter::MessageDelivered(_session, from, p) => {
227                self.scx.metrics.messages_delivered_inc();
228                if p.retain {
229                    self.scx.metrics.messages_delivered_retain_inc()
230                }
231                match from.typ() {
232                    FromType::Custom => self.scx.metrics.messages_delivered_custom_inc(),
233                    FromType::Admin => self.scx.metrics.messages_delivered_admin_inc(),
234                    FromType::System => self.scx.metrics.messages_delivered_system_inc(),
235                    FromType::LastWill => self.scx.metrics.messages_delivered_lastwill_inc(),
236                    FromType::Bridge => self.scx.metrics.messages_delivered_bridge_inc(),
237                }
238            }
239            Parameter::MessageAcked(_session, from, p) => {
240                self.scx.metrics.messages_acked_inc();
241                if p.retain {
242                    self.scx.metrics.messages_acked_retain_inc()
243                }
244                match from.typ() {
245                    FromType::Custom => self.scx.metrics.messages_acked_custom_inc(),
246                    FromType::Admin => self.scx.metrics.messages_acked_admin_inc(),
247                    FromType::System => self.scx.metrics.messages_acked_system_inc(),
248                    FromType::LastWill => self.scx.metrics.messages_acked_lastwill_inc(),
249                    FromType::Bridge => self.scx.metrics.messages_acked_bridge_inc(),
250                }
251            }
252            Parameter::MessageDropped(_to, _from, _p, _r) => {
253                self.scx.metrics.messages_dropped_inc(); //@TODO ... elaboration
254            }
255            Parameter::MessageNonsubscribed(from) => {
256                self.scx.metrics.messages_nonsubscribed_inc();
257                match from.typ() {
258                    FromType::Custom => self.scx.metrics.messages_nonsubscribed_custom_inc(),
259                    FromType::Admin => self.scx.metrics.messages_nonsubscribed_admin_inc(),
260                    FromType::System => self.scx.metrics.messages_nonsubscribed_system_inc(),
261                    FromType::LastWill => self.scx.metrics.messages_nonsubscribed_lastwill_inc(),
262                    FromType::Bridge => self.scx.metrics.messages_nonsubscribed_bridge_inc(),
263                }
264            }
265
266            _ => {
267                log::error!("parameter is: {param:?}");
268            }
269        }
270        (true, acc)
271    }
272}