Skip to main content

rmqtt/
node.rs

1//! MQTT Broker Node Management Core
2//!
3//! Provides centralized node monitoring and resource management for MQTT broker clusters, implementing:
4//! 1. **Node State Tracking**:
5//!    - Uptime calculation with chrono integration
6//!    - System load monitoring (1/5/15-minute averages)
7//!    - Memory/Disk usage statistics collection
8//! 2. **Cluster Health Management**:
9//!    - Busy state detection with configurable thresholds
10//!    - CPU load aggregation using systemstat
11//!    - Graceful degradation through max_busy_loadavg/max_busy_cpuloadavg
12//! 3. **Protocol Implementation**:
13//!    - gRPC server/client integration for cluster communication
14//!    - JSON serialization of broker/node status (BrokerInfo/NodeInfo)
15//!    - Version metadata exposure (Rustc + build version)
16//!
17//! Key components align with MQTT specification requirements:
18//! - Persistent session management through NodeStatus tracking
19//! - Resource monitoring for connection capacity planning
20//! - Distributed architecture support via gRPC
21//!
22
23use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27use serde_json::json;
28use systemstat::Platform;
29
30use crate::context::ServerContext;
31#[cfg(feature = "grpc")]
32use crate::grpc::{GrpcClient, GrpcServer};
33use crate::types::{NodeId, TimestampMillis};
34use crate::utils::timestamp_millis;
35
36const VERSION: &str = env!("CARGO_PKG_VERSION");
37
38const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
39
40const RUSTC_BUILD_TIME: &str = env!("RUSTC_BUILD_TIME");
41
42#[derive(Debug)]
43pub struct Node {
44    pub id: NodeId,
45    pub start_time: chrono::DateTime<chrono::Local>,
46    max_busy_loadavg: f32,
47    max_busy_cpuloadavg: f32,
48    pub(crate) busy_update_interval: Duration,
49    cpuload: AtomicI64,
50}
51
52impl Default for Node {
53    fn default() -> Self {
54        Self::new(0, 80.0, 90.0, Duration::from_secs(2))
55    }
56}
57
58impl Node {
59    pub fn new(
60        id: NodeId,
61        max_busy_loadavg: f32,
62        max_busy_cpuloadavg: f32,
63        busy_update_interval: Duration,
64    ) -> Self {
65        Self {
66            id,
67            start_time: chrono::Local::now(),
68            max_busy_loadavg,
69            max_busy_cpuloadavg,
70            busy_update_interval,
71            cpuload: AtomicI64::new(0),
72        }
73    }
74
75    #[inline]
76    pub fn id(&self) -> NodeId {
77        self.id
78    }
79
80    #[inline]
81    pub async fn name(&self, scx: &ServerContext, id: NodeId) -> String {
82        scx.extends.shared().await.node_name(id)
83    }
84
85    #[cfg(feature = "grpc")]
86    #[inline]
87    pub async fn new_grpc_client(
88        &self,
89        remote_addr: &str,
90        connect_timeout: Duration,
91        client_concurrency_limit: usize,
92        _batch_size: usize,
93    ) -> crate::Result<GrpcClient> {
94        GrpcClient::new(remote_addr, connect_timeout, client_concurrency_limit).await
95    }
96
97    #[cfg(feature = "grpc")]
98    pub fn start_grpc_server(
99        &self,
100        scx: ServerContext,
101        server_addr: std::net::SocketAddr,
102        reuseaddr: bool,
103        reuseport: bool,
104    ) {
105        tokio::spawn(async move {
106            if let Err(e) = GrpcServer::new(scx).listen_and_serve(server_addr, reuseaddr, reuseport).await {
107                log::error!("listen and serve failure, {e:?}, laddr: {server_addr:?}");
108            }
109        });
110    }
111
112    #[inline]
113    pub async fn status(&self, scx: &ServerContext) -> NodeStatus {
114        match scx.extends.shared().await.health_status().await {
115            Ok(status) => {
116                if status.running {
117                    NodeStatus::Running(1)
118                } else {
119                    NodeStatus::Stop
120                }
121            }
122            Err(e) => NodeStatus::Error(e.to_string()),
123        }
124    }
125
126    #[inline]
127    fn uptime(&self) -> String {
128        to_uptime((chrono::Local::now() - self.start_time).num_seconds())
129    }
130
131    #[inline]
132    pub fn version() -> String {
133        format!("rmqtt/{VERSION}-{RUSTC_BUILD_TIME}")
134    }
135
136    #[inline]
137    pub fn rustc_version() -> String {
138        RUSTC_VERSION.to_string()
139    }
140
141    #[inline]
142    pub async fn broker_info(&self, scx: &ServerContext) -> BrokerInfo {
143        let node_id = self.id;
144        BrokerInfo {
145            version: Self::version(),
146            rustc_version: Self::rustc_version(),
147            uptime: self.uptime(),
148            sysdescr: "RMQTT Broker".into(),
149            node_status: self.status(scx).await,
150            node_id,
151            node_name: self.name(scx, node_id).await, //Runtime::instance().extends.shared().await.node_name(node_id),
152            datetime: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
153        }
154    }
155
156    #[inline]
157    pub async fn node_info(&self, scx: &ServerContext) -> NodeInfo {
158        let node_id = self.id;
159
160        let sys = systemstat::System::new();
161        let boottime = sys.boot_time().map(|t| t.to_string()).unwrap_or_default();
162        let loadavg = sys.load_average();
163        let mem_info = sys.memory();
164
165        let (disk_total, disk_free) = if let Ok(mounts) = sys.mounts() {
166            let total = mounts.iter().map(|m| m.total.as_u64()).sum();
167            let free = mounts.iter().map(|m| m.free.as_u64()).sum();
168            (total, free)
169        } else {
170            (0, 0)
171        };
172
173        NodeInfo {
174            connections: scx.connections.count(),
175            boottime,
176            load1: loadavg.as_ref().map(|l| l.one).unwrap_or_default(),
177            load5: loadavg.as_ref().map(|l| l.five).unwrap_or_default(),
178            load15: loadavg.as_ref().map(|l| l.fifteen).unwrap_or_default(),
179            memory_total: mem_info.as_ref().map(|m| m.total.as_u64()).unwrap_or_default(),
180            memory_free: mem_info.as_ref().map(|m| m.free.as_u64()).unwrap_or_default(),
181            memory_used: mem_info
182                .as_ref()
183                .map(|m| systemstat::saturating_sub_bytes(m.total, m.free).as_u64())
184                .unwrap_or_default(),
185            disk_total,
186            disk_free,
187            node_status: self.status(scx).await,
188            node_id,
189            node_name: self.name(scx, node_id).await, //Runtime::instance().extends.shared().await.node_name(node_id),
190            uptime: self.uptime(),
191            version: Self::version(),
192            rustc_version: Self::rustc_version(),
193        }
194    }
195
196    #[inline]
197    fn _is_busy(&self) -> bool {
198        let sys = systemstat::System::new();
199        let cpuload = self.cpuload();
200
201        let loadavg = sys.load_average();
202        let load1 = loadavg.as_ref().map(|l| l.one).unwrap_or_default();
203
204        load1 > self.max_busy_loadavg || cpuload > self.max_busy_cpuloadavg
205    }
206
207    #[inline]
208    pub fn sys_is_busy(&self) -> bool {
209        static CACHED_BUSY: AtomicBool = AtomicBool::new(false);
210        static CACHED_TIME: AtomicI64 = AtomicI64::new(0);
211
212        let now = timestamp_millis();
213
214        let last_update = CACHED_TIME.load(Ordering::Relaxed);
215
216        if now - last_update < self.busy_update_interval.as_millis() as TimestampMillis {
217            return CACHED_BUSY.load(Ordering::Relaxed);
218        }
219
220        let busy = self._is_busy();
221
222        CACHED_BUSY.store(busy, Ordering::Relaxed);
223        CACHED_TIME.store(now, Ordering::Relaxed);
224
225        busy
226    }
227
228    #[inline]
229    pub fn cpuload(&self) -> f32 {
230        //0.0 - 100.0
231        self.cpuload.load(Ordering::SeqCst) as f32 / 100.0
232    }
233
234    pub async fn update_cpuload(&self) {
235        let sys = systemstat::System::new();
236        let cpuload_aggr = sys.cpu_load_aggregate().ok();
237        tokio::time::sleep(Duration::from_secs(2)).await;
238        let cpuload_aggr = cpuload_aggr.and_then(|dm| dm.done().ok());
239        let cpuload = cpuload_aggr
240            .map(|cpuload_aggr| {
241                let aggregate1 =
242                    cpuload_aggr.user + cpuload_aggr.nice + cpuload_aggr.system + cpuload_aggr.interrupt;
243                aggregate1 * 100.0
244            })
245            .unwrap_or_default();
246        self.cpuload.store(cpuload as i64, Ordering::SeqCst);
247    }
248}
249
250#[derive(Serialize, Deserialize, Clone, Debug)]
251pub struct BrokerInfo {
252    pub version: String,
253    pub rustc_version: String,
254    pub uptime: String,
255    pub sysdescr: String,
256    pub node_status: NodeStatus,
257    pub node_id: NodeId,
258    pub node_name: String,
259    pub datetime: String,
260}
261
262impl BrokerInfo {
263    pub fn to_json(&self) -> serde_json::Value {
264        json!({
265            "version": self.version,
266            "rustc_version": self.rustc_version,
267            "uptime": self.uptime,
268            "sysdescr": self.sysdescr,
269            "running": self.node_status.is_running(),
270            "node_id": self.node_id,
271            "node_name": self.node_name,
272            "datetime": self.datetime
273        })
274    }
275}
276
277#[derive(Serialize, Deserialize, Clone, Debug, Default)]
278pub struct NodeInfo {
279    pub connections: isize,
280    pub boottime: String,
281    pub load1: f32,
282    pub load5: f32,
283    pub load15: f32,
284    // pub max_fds: usize,
285    // pub cpu_num: String,
286    // pub cpu_speed: String,
287    pub memory_total: u64,
288    pub memory_used: u64,
289    pub memory_free: u64,
290    pub disk_total: u64,
291    pub disk_free: u64,
292    // pub os_release: String,
293    // pub os_type: String,
294    // pub proc_total: String,
295    pub node_status: NodeStatus,
296    pub node_id: NodeId,
297    pub node_name: String,
298    pub uptime: String,
299    pub version: String,
300    pub rustc_version: String,
301}
302
303impl NodeInfo {
304    #[inline]
305    pub fn to_json(&self) -> serde_json::Value {
306        json!({
307            "connections":  self.connections,
308            "boottime":  self.boottime,
309            "load1":  self.load1,
310            "load5":  self.load5,
311            "load15":  self.load15,
312            // "max_fds":  self.max_fds,
313            // "cpu_num":  self.cpu_num,
314            // "cpu_speed":  self.cpu_speed,
315            "memory_total":  self.memory_total,
316            "memory_used":  self.memory_used,
317            "memory_free":  self.memory_free,
318            "disk_total":  self.disk_total,
319            "disk_free":  self.disk_free,
320            // "os_release":  self.os_release,
321            // "os_type":  self.os_type,
322            // "proc_total":  self.proc_total,
323            "running":  self.node_status.is_running(),
324            "node_id":  self.node_id,
325            "node_name":  self.node_name,
326            "uptime":  self.uptime,
327            "version":  self.version,
328            "rustc_version": self.rustc_version,
329        })
330    }
331
332    #[inline]
333    pub fn add(&mut self, other: &NodeInfo) {
334        self.load1 += other.load1;
335        self.load5 += other.load5;
336        self.load15 += other.load15;
337        self.memory_total += other.memory_total;
338        self.memory_used += other.memory_used;
339        self.memory_free += other.memory_free;
340        self.disk_total += other.disk_total;
341        self.disk_free += other.disk_free;
342        self.node_status = {
343            let c = match (&self.node_status, &other.node_status) {
344                (NodeStatus::Running(c1), NodeStatus::Running(c2)) => *c1 + *c2,
345                (NodeStatus::Running(c1), _) => *c1,
346                (_, NodeStatus::Running(c2)) => *c2,
347                (_, _) => 0,
348            };
349            NodeStatus::Running(c)
350        };
351    }
352}
353
354#[derive(Serialize, Deserialize, Clone, Debug)]
355#[serde(rename_all = "lowercase")]
356pub enum NodeStatus {
357    Running(usize),
358    Stop,
359    Error(String),
360}
361
362impl NodeStatus {
363    #[inline]
364    pub fn running(&self) -> usize {
365        if let NodeStatus::Running(c) = self {
366            *c
367        } else {
368            0
369        }
370    }
371
372    #[inline]
373    pub fn is_running(&self) -> bool {
374        matches!(self, NodeStatus::Running(_))
375    }
376}
377
378impl Default for NodeStatus {
379    #[inline]
380    fn default() -> Self {
381        NodeStatus::Stop
382    }
383}
384
385#[inline]
386pub fn to_uptime(uptime: i64) -> String {
387    let uptime_secs = uptime % 60;
388    let uptime = uptime / 60;
389    let uptime_minus = uptime % 60;
390    let uptime = uptime / 60;
391    let uptime_hours = uptime % 24;
392    let uptime_days = uptime / 24;
393    format!("{uptime_days} days {uptime_hours} hours, {uptime_minus} minutes, {uptime_secs} seconds")
394}