1use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
24use std::time::Duration;
25
26use serde::{Deserialize, Serialize};
27use serde_json::json;
28use systemstat::Platform;
29
30use crate::context::ServerContext;
31#[cfg(feature = "grpc")]
32use crate::grpc::{GrpcClient, GrpcServer};
33use crate::types::{NodeId, TimestampMillis};
34use crate::utils::timestamp_millis;
35
36const VERSION: &str = env!("CARGO_PKG_VERSION");
37
38const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
39
40const RUSTC_BUILD_TIME: &str = env!("RUSTC_BUILD_TIME");
41
42#[derive(Debug)]
43pub struct Node {
44 pub id: NodeId,
45 pub start_time: chrono::DateTime<chrono::Local>,
46 max_busy_loadavg: f32,
47 max_busy_cpuloadavg: f32,
48 pub(crate) busy_update_interval: Duration,
49 cpuload: AtomicI64,
50}
51
52impl Default for Node {
53 fn default() -> Self {
54 Self::new(0, 80.0, 90.0, Duration::from_secs(2))
55 }
56}
57
58impl Node {
59 pub fn new(
60 id: NodeId,
61 max_busy_loadavg: f32,
62 max_busy_cpuloadavg: f32,
63 busy_update_interval: Duration,
64 ) -> Self {
65 Self {
66 id,
67 start_time: chrono::Local::now(),
68 max_busy_loadavg,
69 max_busy_cpuloadavg,
70 busy_update_interval,
71 cpuload: AtomicI64::new(0),
72 }
73 }
74
75 #[inline]
76 pub fn id(&self) -> NodeId {
77 self.id
78 }
79
80 #[inline]
81 pub async fn name(&self, scx: &ServerContext, id: NodeId) -> String {
82 scx.extends.shared().await.node_name(id)
83 }
84
85 #[cfg(feature = "grpc")]
86 #[inline]
87 pub async fn new_grpc_client(
88 &self,
89 remote_addr: &str,
90 connect_timeout: Duration,
91 client_concurrency_limit: usize,
92 _batch_size: usize,
93 ) -> crate::Result<GrpcClient> {
94 GrpcClient::new(remote_addr, connect_timeout, client_concurrency_limit).await
95 }
96
97 #[cfg(feature = "grpc")]
98 pub fn start_grpc_server(
99 &self,
100 scx: ServerContext,
101 server_addr: std::net::SocketAddr,
102 reuseaddr: bool,
103 reuseport: bool,
104 ) {
105 tokio::spawn(async move {
106 if let Err(e) = GrpcServer::new(scx).listen_and_serve(server_addr, reuseaddr, reuseport).await {
107 log::error!("listen and serve failure, {e:?}, laddr: {server_addr:?}");
108 }
109 });
110 }
111
112 #[inline]
113 pub async fn status(&self, scx: &ServerContext) -> NodeStatus {
114 match scx.extends.shared().await.health_status().await {
115 Ok(status) => {
116 if status.running {
117 NodeStatus::Running(1)
118 } else {
119 NodeStatus::Stop
120 }
121 }
122 Err(e) => NodeStatus::Error(e.to_string()),
123 }
124 }
125
126 #[inline]
127 fn uptime(&self) -> String {
128 to_uptime((chrono::Local::now() - self.start_time).num_seconds())
129 }
130
131 #[inline]
132 pub fn version() -> String {
133 format!("rmqtt/{VERSION}-{RUSTC_BUILD_TIME}")
134 }
135
136 #[inline]
137 pub fn rustc_version() -> String {
138 RUSTC_VERSION.to_string()
139 }
140
141 #[inline]
142 pub async fn broker_info(&self, scx: &ServerContext) -> BrokerInfo {
143 let node_id = self.id;
144 BrokerInfo {
145 version: Self::version(),
146 rustc_version: Self::rustc_version(),
147 uptime: self.uptime(),
148 sysdescr: "RMQTT Broker".into(),
149 node_status: self.status(scx).await,
150 node_id,
151 node_name: self.name(scx, node_id).await, datetime: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
153 }
154 }
155
156 #[inline]
157 pub async fn node_info(&self, scx: &ServerContext) -> NodeInfo {
158 let node_id = self.id;
159
160 let sys = systemstat::System::new();
161 let boottime = sys.boot_time().map(|t| t.to_string()).unwrap_or_default();
162 let loadavg = sys.load_average();
163 let mem_info = sys.memory();
164
165 let (disk_total, disk_free) = if let Ok(mounts) = sys.mounts() {
166 let total = mounts.iter().map(|m| m.total.as_u64()).sum();
167 let free = mounts.iter().map(|m| m.free.as_u64()).sum();
168 (total, free)
169 } else {
170 (0, 0)
171 };
172
173 NodeInfo {
174 connections: scx.connections.count(),
175 boottime,
176 load1: loadavg.as_ref().map(|l| l.one).unwrap_or_default(),
177 load5: loadavg.as_ref().map(|l| l.five).unwrap_or_default(),
178 load15: loadavg.as_ref().map(|l| l.fifteen).unwrap_or_default(),
179 memory_total: mem_info.as_ref().map(|m| m.total.as_u64()).unwrap_or_default(),
180 memory_free: mem_info.as_ref().map(|m| m.free.as_u64()).unwrap_or_default(),
181 memory_used: mem_info
182 .as_ref()
183 .map(|m| systemstat::saturating_sub_bytes(m.total, m.free).as_u64())
184 .unwrap_or_default(),
185 disk_total,
186 disk_free,
187 node_status: self.status(scx).await,
188 node_id,
189 node_name: self.name(scx, node_id).await, uptime: self.uptime(),
191 version: Self::version(),
192 rustc_version: Self::rustc_version(),
193 }
194 }
195
196 #[inline]
197 fn _is_busy(&self) -> bool {
198 let sys = systemstat::System::new();
199 let cpuload = self.cpuload();
200
201 let loadavg = sys.load_average();
202 let load1 = loadavg.as_ref().map(|l| l.one).unwrap_or_default();
203
204 load1 > self.max_busy_loadavg || cpuload > self.max_busy_cpuloadavg
205 }
206
207 #[inline]
208 pub fn sys_is_busy(&self) -> bool {
209 static CACHED_BUSY: AtomicBool = AtomicBool::new(false);
210 static CACHED_TIME: AtomicI64 = AtomicI64::new(0);
211
212 let now = timestamp_millis();
213
214 let last_update = CACHED_TIME.load(Ordering::Relaxed);
215
216 if now - last_update < self.busy_update_interval.as_millis() as TimestampMillis {
217 return CACHED_BUSY.load(Ordering::Relaxed);
218 }
219
220 let busy = self._is_busy();
221
222 CACHED_BUSY.store(busy, Ordering::Relaxed);
223 CACHED_TIME.store(now, Ordering::Relaxed);
224
225 busy
226 }
227
228 #[inline]
229 pub fn cpuload(&self) -> f32 {
230 self.cpuload.load(Ordering::SeqCst) as f32 / 100.0
232 }
233
234 pub async fn update_cpuload(&self) {
235 let sys = systemstat::System::new();
236 let cpuload_aggr = sys.cpu_load_aggregate().ok();
237 tokio::time::sleep(Duration::from_secs(2)).await;
238 let cpuload_aggr = cpuload_aggr.and_then(|dm| dm.done().ok());
239 let cpuload = cpuload_aggr
240 .map(|cpuload_aggr| {
241 let aggregate1 =
242 cpuload_aggr.user + cpuload_aggr.nice + cpuload_aggr.system + cpuload_aggr.interrupt;
243 aggregate1 * 100.0
244 })
245 .unwrap_or_default();
246 self.cpuload.store(cpuload as i64, Ordering::SeqCst);
247 }
248}
249
250#[derive(Serialize, Deserialize, Clone, Debug)]
251pub struct BrokerInfo {
252 pub version: String,
253 pub rustc_version: String,
254 pub uptime: String,
255 pub sysdescr: String,
256 pub node_status: NodeStatus,
257 pub node_id: NodeId,
258 pub node_name: String,
259 pub datetime: String,
260}
261
262impl BrokerInfo {
263 pub fn to_json(&self) -> serde_json::Value {
264 json!({
265 "version": self.version,
266 "rustc_version": self.rustc_version,
267 "uptime": self.uptime,
268 "sysdescr": self.sysdescr,
269 "running": self.node_status.is_running(),
270 "node_id": self.node_id,
271 "node_name": self.node_name,
272 "datetime": self.datetime
273 })
274 }
275}
276
277#[derive(Serialize, Deserialize, Clone, Debug, Default)]
278pub struct NodeInfo {
279 pub connections: isize,
280 pub boottime: String,
281 pub load1: f32,
282 pub load5: f32,
283 pub load15: f32,
284 pub memory_total: u64,
288 pub memory_used: u64,
289 pub memory_free: u64,
290 pub disk_total: u64,
291 pub disk_free: u64,
292 pub node_status: NodeStatus,
296 pub node_id: NodeId,
297 pub node_name: String,
298 pub uptime: String,
299 pub version: String,
300 pub rustc_version: String,
301}
302
303impl NodeInfo {
304 #[inline]
305 pub fn to_json(&self) -> serde_json::Value {
306 json!({
307 "connections": self.connections,
308 "boottime": self.boottime,
309 "load1": self.load1,
310 "load5": self.load5,
311 "load15": self.load15,
312 "memory_total": self.memory_total,
316 "memory_used": self.memory_used,
317 "memory_free": self.memory_free,
318 "disk_total": self.disk_total,
319 "disk_free": self.disk_free,
320 "running": self.node_status.is_running(),
324 "node_id": self.node_id,
325 "node_name": self.node_name,
326 "uptime": self.uptime,
327 "version": self.version,
328 "rustc_version": self.rustc_version,
329 })
330 }
331
332 #[inline]
333 pub fn add(&mut self, other: &NodeInfo) {
334 self.load1 += other.load1;
335 self.load5 += other.load5;
336 self.load15 += other.load15;
337 self.memory_total += other.memory_total;
338 self.memory_used += other.memory_used;
339 self.memory_free += other.memory_free;
340 self.disk_total += other.disk_total;
341 self.disk_free += other.disk_free;
342 self.node_status = {
343 let c = match (&self.node_status, &other.node_status) {
344 (NodeStatus::Running(c1), NodeStatus::Running(c2)) => *c1 + *c2,
345 (NodeStatus::Running(c1), _) => *c1,
346 (_, NodeStatus::Running(c2)) => *c2,
347 (_, _) => 0,
348 };
349 NodeStatus::Running(c)
350 };
351 }
352}
353
354#[derive(Serialize, Deserialize, Clone, Debug)]
355#[serde(rename_all = "lowercase")]
356pub enum NodeStatus {
357 Running(usize),
358 Stop,
359 Error(String),
360}
361
362impl NodeStatus {
363 #[inline]
364 pub fn running(&self) -> usize {
365 if let NodeStatus::Running(c) = self {
366 *c
367 } else {
368 0
369 }
370 }
371
372 #[inline]
373 pub fn is_running(&self) -> bool {
374 matches!(self, NodeStatus::Running(_))
375 }
376}
377
378impl Default for NodeStatus {
379 #[inline]
380 fn default() -> Self {
381 NodeStatus::Stop
382 }
383}
384
385#[inline]
386pub fn to_uptime(uptime: i64) -> String {
387 let uptime_secs = uptime % 60;
388 let uptime = uptime / 60;
389 let uptime_minus = uptime % 60;
390 let uptime = uptime / 60;
391 let uptime_hours = uptime % 24;
392 let uptime_days = uptime / 24;
393 format!("{uptime_days} days {uptime_hours} hours, {uptime_minus} minutes, {uptime_secs} seconds")
394}