use crate::broker::router::MessageRouter;
use crate::packet::publish::PublishPacket;
use crate::time::{Duration, SystemTime};
use crate::QoS;
use bytes::Bytes;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::interval;
use tracing::debug;
#[derive(Debug)]
pub struct BrokerStats {
pub start_time: SystemTime,
pub clients_connected: AtomicUsize,
pub clients_total: AtomicU64,
pub clients_maximum: AtomicUsize,
pub messages_sent: AtomicU64,
pub messages_received: AtomicU64,
pub publish_sent: AtomicU64,
pub publish_received: AtomicU64,
pub bytes_sent: AtomicU64,
pub bytes_received: AtomicU64,
}
impl BrokerStats {
#[must_use]
pub fn new() -> Self {
Self {
start_time: SystemTime::now(),
clients_connected: AtomicUsize::new(0),
clients_total: AtomicU64::new(0),
clients_maximum: AtomicUsize::new(0),
messages_sent: AtomicU64::new(0),
messages_received: AtomicU64::new(0),
publish_sent: AtomicU64::new(0),
publish_received: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
}
}
pub fn client_connected(&self) {
let current = self.clients_connected.fetch_add(1, Ordering::Relaxed) + 1;
self.clients_total.fetch_add(1, Ordering::Relaxed);
let mut max = self.clients_maximum.load(Ordering::Relaxed);
while current > max {
match self.clients_maximum.compare_exchange_weak(
max,
current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
}
pub fn client_disconnected(&self) {
self.clients_connected.fetch_sub(1, Ordering::Relaxed);
}
pub fn message_sent(&self, bytes: usize) {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
self.bytes_sent.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn publish_sent(&self, bytes: usize) {
self.publish_sent.fetch_add(1, Ordering::Relaxed);
self.message_sent(bytes);
}
pub fn message_received(&self, bytes: usize) {
self.messages_received.fetch_add(1, Ordering::Relaxed);
self.bytes_received
.fetch_add(bytes as u64, Ordering::Relaxed);
}
pub fn publish_received(&self, bytes: usize) {
self.publish_received.fetch_add(1, Ordering::Relaxed);
self.message_received(bytes);
}
#[must_use]
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().unwrap_or_default().as_secs()
}
}
impl Default for BrokerStats {
fn default() -> Self {
Self::new()
}
}
pub struct SysTopicsProvider {
router: Arc<MessageRouter>,
stats: Arc<BrokerStats>,
update_interval: Duration,
version: String,
}
impl SysTopicsProvider {
#[must_use]
pub fn new(router: Arc<MessageRouter>, stats: Arc<BrokerStats>) -> Self {
Self {
router,
stats,
update_interval: Duration::from_secs(10),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
#[must_use]
pub fn with_update_interval(mut self, interval: Duration) -> Self {
self.update_interval = interval;
self
}
#[cfg(not(target_arch = "wasm32"))]
#[must_use]
pub fn start(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = interval(self.update_interval);
interval.tick().await;
self.publish_static_topics().await;
loop {
interval.tick().await;
self.publish_dynamic_topics().await;
}
})
}
pub async fn publish_static_topics(&self) {
self.publish_sys_topic("$SYS/broker/version", &self.version)
.await;
self.publish_sys_topic("$SYS/broker/implementation", "mqtt-v5")
.await;
self.publish_sys_topic("$SYS/broker/protocol_version", "5.0")
.await;
}
pub async fn publish_dynamic_topics(&self) {
let uptime = self.stats.uptime_seconds();
self.publish_sys_topic("$SYS/broker/uptime", &uptime.to_string())
.await;
let connected = self.stats.clients_connected.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/clients/connected", &connected.to_string())
.await;
let total = self.stats.clients_total.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/clients/total", &total.to_string())
.await;
let maximum = self.stats.clients_maximum.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/clients/maximum", &maximum.to_string())
.await;
let msg_sent = self.stats.messages_sent.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/messages/sent", &msg_sent.to_string())
.await;
let msg_recv = self.stats.messages_received.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/messages/received", &msg_recv.to_string())
.await;
let pub_sent = self.stats.publish_sent.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/messages/publish/sent", &pub_sent.to_string())
.await;
let pub_recv = self.stats.publish_received.load(Ordering::Relaxed);
self.publish_sys_topic(
"$SYS/broker/messages/publish/received",
&pub_recv.to_string(),
)
.await;
let bytes_sent = self.stats.bytes_sent.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/bytes/sent", &bytes_sent.to_string())
.await;
let bytes_recv = self.stats.bytes_received.load(Ordering::Relaxed);
self.publish_sys_topic("$SYS/broker/bytes/received", &bytes_recv.to_string())
.await;
let retained_count = self.router.retained_count().await;
self.publish_sys_topic("$SYS/broker/retained/count", &retained_count.to_string())
.await;
let topic_count = self.router.topic_count().await;
self.publish_sys_topic("$SYS/broker/subscriptions/count", &topic_count.to_string())
.await;
}
pub async fn publish_sys_topic(&self, topic: &str, value: &str) {
let packet = PublishPacket::new(
topic,
Bytes::copy_from_slice(value.as_bytes()),
QoS::AtMostOnce,
)
.with_retain(true);
debug!("Publishing $SYS topic: {} = {}", topic, value);
self.router.route_message(&packet, None).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_broker_stats_client_tracking() {
let stats = BrokerStats::new();
stats.client_connected();
stats.client_connected();
stats.client_connected();
assert_eq!(stats.clients_connected.load(Ordering::Relaxed), 3);
assert_eq!(stats.clients_total.load(Ordering::Relaxed), 3);
assert_eq!(stats.clients_maximum.load(Ordering::Relaxed), 3);
stats.client_disconnected();
assert_eq!(stats.clients_connected.load(Ordering::Relaxed), 2);
assert_eq!(stats.clients_total.load(Ordering::Relaxed), 3);
assert_eq!(stats.clients_maximum.load(Ordering::Relaxed), 3);
stats.client_connected();
stats.client_connected();
assert_eq!(stats.clients_connected.load(Ordering::Relaxed), 4);
assert_eq!(stats.clients_total.load(Ordering::Relaxed), 5);
assert_eq!(stats.clients_maximum.load(Ordering::Relaxed), 4);
}
#[test]
fn test_broker_stats_message_tracking() {
let stats = BrokerStats::new();
stats.message_sent(100);
stats.publish_sent(200);
assert_eq!(stats.messages_sent.load(Ordering::Relaxed), 2);
assert_eq!(stats.publish_sent.load(Ordering::Relaxed), 1);
assert_eq!(stats.bytes_sent.load(Ordering::Relaxed), 300);
stats.message_received(150);
stats.publish_received(250);
assert_eq!(stats.messages_received.load(Ordering::Relaxed), 2);
assert_eq!(stats.publish_received.load(Ordering::Relaxed), 1);
assert_eq!(stats.bytes_received.load(Ordering::Relaxed), 400);
}
#[test]
fn test_broker_stats_uptime() {
let stats = BrokerStats::new();
std::thread::sleep(Duration::from_millis(10));
assert!(stats.uptime_seconds() < 1000); }
#[tokio::test]
async fn test_sys_topics_provider() {
let router = Arc::new(MessageRouter::new());
let stats = Arc::new(BrokerStats::new());
let provider = SysTopicsProvider::new(Arc::clone(&router), Arc::clone(&stats))
.with_update_interval(Duration::from_millis(100));
stats.client_connected();
stats.publish_sent(100);
stats.publish_received(200);
let handle = provider.start();
tokio::time::sleep(Duration::from_millis(150)).await;
let retained = router.get_retained_messages("$SYS/#").await;
assert!(!retained.is_empty());
let version_msg = retained
.iter()
.find(|msg| msg.topic_name == "$SYS/broker/version");
assert!(version_msg.is_some());
let clients_msg = retained
.iter()
.find(|msg| msg.topic_name == "$SYS/broker/clients/connected");
assert!(clients_msg.is_some());
assert_eq!(&clients_msg.unwrap().payload[..], b"1");
handle.abort();
}
}