use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::context::ServerContext;
use crate::context::TaskExecStats;
use crate::types::{HashMap, NodeId};
use crate::utils::Counter;
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Stats {
pub handshakings: Counter,
pub handshakings_active: Counter,
pub handshakings_rate: Counter,
pub connections: Counter,
pub sessions: Counter,
pub subscriptions: Counter,
pub subscriptions_shared: Counter,
pub message_queues: Counter,
pub out_inflights: Counter,
pub in_inflights: Counter,
pub forwards: Counter,
pub message_storages: Counter,
pub retaineds: Counter,
pub delayed_publishs: Counter,
pub grpc_server_actives: Counter,
pub grpc_clients_actives: HashMap<NodeId, Counter>,
pub topics_map: HashMap<NodeId, Counter>,
pub routes_map: HashMap<NodeId, Counter>,
pub execs_actives: HashMap<String, TaskExecStats>,
#[cfg(feature = "debug")]
debug_client_states_map: HashMap<NodeId, usize>,
#[cfg(feature = "debug")]
debug_topics_tree_map: HashMap<NodeId, usize>,
#[cfg(feature = "debug")]
debug_shared_peers: Counter,
#[cfg(feature = "debug")]
debug_subscriptions: usize,
#[cfg(feature = "debug")]
pub debug_session_channels: Counter,
}
impl Stats {
#[inline]
pub fn new() -> Self {
Self {
handshakings: Counter::new(),
handshakings_active: Counter::new(),
handshakings_rate: Counter::new(),
connections: Counter::new(),
sessions: Counter::new(),
subscriptions: Counter::new(),
subscriptions_shared: Counter::new(),
message_queues: Counter::new(),
out_inflights: Counter::new(),
in_inflights: Counter::new(),
forwards: Counter::new(),
message_storages: Counter::new(),
retaineds: Counter::new(),
delayed_publishs: Counter::new(),
grpc_server_actives: Counter::new(),
grpc_clients_actives: HashMap::default(),
topics_map: HashMap::default(),
routes_map: HashMap::default(),
execs_actives: HashMap::default(),
#[cfg(feature = "debug")]
debug_client_states_map: HashMap::default(),
#[cfg(feature = "debug")]
debug_topics_tree_map: HashMap::default(),
#[cfg(feature = "debug")]
debug_shared_peers: Counter::new(),
#[cfg(feature = "debug")]
debug_subscriptions: 0,
#[cfg(feature = "debug")]
debug_session_channels: Counter::new(),
}
}
#[inline]
pub async fn clone(&self, scx: &ServerContext) -> Self {
let node_id = scx.node.id;
let mut topics_map = HashMap::default();
let mut routes_map = HashMap::default();
{
let router = scx.extends.router().await;
topics_map.insert(node_id, router.topics());
routes_map.insert(node_id, router.routes());
}
#[allow(unused_mut)]
let mut grpc_clients_actives = HashMap::default();
#[cfg(feature = "grpc")]
{
let shared = scx.extends.shared().await;
for (id, (_, grpc_client)) in shared.get_grpc_clients().iter() {
grpc_clients_actives.insert(*id, grpc_client.active_tasks().clone());
}
}
let mut execs_actives = HashMap::default();
for (key, exec) in scx.execs() {
execs_actives.insert(key, TaskExecStats::from_exec(&exec).await);
}
self.handshakings.set(&scx.handshakings);
self.handshakings_active.current_set(scx.handshake_exec.active_count());
self.handshakings_rate.sets((scx.handshake_exec.get_rate().await * 100.0) as isize);
self.connections.set(&scx.connections);
self.sessions.set(&scx.sessions);
#[cfg(feature = "msgstore")]
{
let message_mgr = scx.extends.message_mgr().await;
self.message_storages.current_set(message_mgr.count().await);
self.message_storages.max_max(message_mgr.max().await);
}
#[cfg(feature = "retain")]
let retaineds = {
let retain = scx.extends.retain().await;
Counter::new_with(retain.count().await, retain.max().await, retain.stats_merge_mode())
};
#[cfg(not(feature = "retain"))]
let retaineds = Counter::default();
#[cfg(feature = "delayed")]
{
let delayed_sender = scx.extends.delayed_sender().await;
self.delayed_publishs.current_set(delayed_sender.len().await as isize);
}
#[cfg(feature = "debug")]
let shared = scx.extends.shared().await;
#[cfg(feature = "debug")]
let mut debug_client_states_map = HashMap::default();
#[cfg(feature = "debug")]
let mut debug_topics_tree_map = HashMap::default();
#[cfg(feature = "debug")]
{
debug_client_states_map.insert(node_id, shared.client_states_count().await);
debug_topics_tree_map.insert(node_id, scx.extends.router().await.topics_tree().await);
}
#[cfg(feature = "debug")]
self.debug_shared_peers.current_set(shared.sessions_count() as isize);
#[cfg(feature = "debug")]
let debug_subscriptions = shared.subscriptions_count().await;
Self {
handshakings: self.handshakings.clone(),
handshakings_active: self.handshakings_active.clone(),
handshakings_rate: self.handshakings_rate.clone(),
connections: self.connections.clone(),
sessions: self.sessions.clone(),
subscriptions: self.subscriptions.clone(),
subscriptions_shared: self.subscriptions_shared.clone(),
message_queues: self.message_queues.clone(),
out_inflights: self.out_inflights.clone(),
in_inflights: self.in_inflights.clone(),
forwards: self.forwards.clone(),
message_storages: self.message_storages.clone(),
delayed_publishs: self.delayed_publishs.clone(),
grpc_server_actives: self.grpc_server_actives.clone(),
grpc_clients_actives,
retaineds,
topics_map,
routes_map,
execs_actives,
#[cfg(feature = "debug")]
debug_client_states_map,
#[cfg(feature = "debug")]
debug_topics_tree_map,
#[cfg(feature = "debug")]
debug_shared_peers: self.debug_shared_peers.clone(),
#[cfg(feature = "debug")]
debug_subscriptions,
#[cfg(feature = "debug")]
debug_session_channels: self.debug_session_channels.clone(),
}
}
#[inline]
pub fn add(&mut self, other: Self) {
self.handshakings.add(&other.handshakings);
self.handshakings_active.add(&other.handshakings_active);
self.handshakings_rate.add(&other.handshakings_rate);
self.connections.add(&other.connections);
self.sessions.add(&other.sessions);
self.subscriptions.add(&other.subscriptions);
self.subscriptions_shared.add(&other.subscriptions_shared);
self.message_queues.add(&other.message_queues);
self.out_inflights.add(&other.out_inflights);
self.in_inflights.add(&other.in_inflights);
self.forwards.add(&other.forwards);
self.message_storages.add(&other.message_storages);
self.retaineds.merge(&other.retaineds);
self.delayed_publishs.merge(&other.delayed_publishs);
self.grpc_server_actives.merge(&other.grpc_server_actives);
self.grpc_clients_actives.extend(other.grpc_clients_actives);
self.topics_map.extend(other.topics_map);
self.routes_map.extend(other.routes_map);
for (k, v) in other.execs_actives {
self.execs_actives.entry(k).and_modify(|tes| tes.add(&v)).or_insert_with(|| v);
}
#[cfg(feature = "debug")]
{
self.debug_client_states_map.extend(other.debug_client_states_map);
self.debug_topics_tree_map.extend(other.debug_topics_tree_map);
self.debug_shared_peers.add(&other.debug_shared_peers);
self.debug_subscriptions += other.debug_subscriptions;
self.debug_session_channels.add(&other.debug_session_channels);
}
}
#[allow(unused_mut)]
#[inline]
pub async fn to_json(&self, scx: &ServerContext) -> serde_json::Value {
let router = scx.extends.router().await;
let topics = router.merge_topics(&self.topics_map);
let routes = router.merge_routes(&self.routes_map);
let grpc_clients_actives = Counter::new();
for (_, c) in self.grpc_clients_actives.iter() {
grpc_clients_actives.add(c);
}
json!({
"handshakings.count": self.handshakings.count(),
"handshakings.max": self.handshakings.max(),
"handshakings_active.count": self.handshakings_active.count(),
"handshakings_rate.count": self.handshakings_rate.count() as f64 / 100.0,
"handshakings_rate.max": self.handshakings_rate.max() as f64 / 100.0,
"connections.count": self.connections.count(),
"connections.max": self.connections.max(),
"sessions.count": self.sessions.count(),
"sessions.max": self.sessions.max(),
"subscriptions.count": self.subscriptions.count(),
"subscriptions.max": self.subscriptions.max(),
"subscriptions_shared.count": self.subscriptions_shared.count(),
"subscriptions_shared.max": self.subscriptions_shared.max(),
"retaineds.count": self.retaineds.count(),
"retaineds.max": self.retaineds.max(),
"message_queues.count": self.message_queues.count(),
"message_queues.max": self.message_queues.max(),
"out_inflights.count": self.out_inflights.count(),
"out_inflights.max": self.out_inflights.max(),
"in_inflights.count": self.in_inflights.count(),
"in_inflights.max": self.in_inflights.max(),
"forwards.count": self.forwards.count(),
"forwards.max": self.forwards.max(),
"message_storages.count": self.message_storages.count(),
"message_storages.max": self.message_storages.max(),
"delayed_publishs.count": self.delayed_publishs.count(),
"delayed_publishs.max": self.delayed_publishs.max(),
"topics.count": topics.count(),
"topics.max": topics.max(),
"routes.count": routes.count(),
"routes.max": routes.max(),
})
}
#[allow(unused_mut)]
#[inline]
pub async fn to_sys_json(&self, _scx: &ServerContext) -> serde_json::Value {
let grpc_clients_actives = Counter::new();
for (_, c) in self.grpc_clients_actives.iter() {
grpc_clients_actives.add(c);
}
let mut json_val = json!({
"grpc_server_actives.count": self.grpc_server_actives.count(),
"grpc_server_actives.max": self.grpc_server_actives.max(),
"grpc_clients_actives.count": grpc_clients_actives.count(),
"grpc_clients_actives.max": grpc_clients_actives.max(),
"execs_actives": self.execs_actives,
});
#[cfg(feature = "debug")]
{
if let Some(obj) = json_val.as_object_mut() {
obj.insert("debug_grpc_clients_actives".into(), json!(self.grpc_clients_actives));
obj.insert("debug_client_states_map".into(), json!(self.debug_client_states_map));
obj.insert("debug_topics_tree_map".into(), json!(self.debug_topics_tree_map));
obj.insert("debug_shared_peers.count".into(), json!(self.debug_shared_peers.count()));
obj.insert("debug_subscriptions.count".into(), json!(self.debug_subscriptions));
obj.insert("debug_session_channels.count".into(), json!(self.debug_session_channels.count()));
obj.insert("debug_session_channels.max".into(), json!(self.debug_session_channels.max()));
}
}
json_val
}
}