use std::time::Duration;
use anyhow::anyhow;
use serde::{de, ser, Deserialize, Serialize};
use rmqtt::types::NodeHealthStatus;
use rmqtt::{
codec::v5::PublishProperties,
metrics::Metrics,
node::{BrokerInfo, NodeInfo, NodeStatus},
plugin::PluginInfo,
stats::Stats,
types::{ClientId, HashMap, NodeId, QoS, Timestamp, TopicFilter, TopicName, UserName},
utils::{deserialize_datetime_option, format_timestamp, serialize_datetime_option},
Result,
};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Message<'a> {
BrokerInfo,
NodeInfo,
NodeHealthStatus,
StatsInfo,
MetricsInfo,
ClientSearch(Box<ClientSearchParams>),
ClientGet { clientid: &'a str },
Subscribe(SubscribeParams),
Unsubscribe(UnsubscribeParams),
GetPlugins,
GetPlugin { name: &'a str },
GetPluginConfig { name: &'a str },
ReloadPluginConfig { name: &'a str },
LoadPlugin { name: &'a str },
UnloadPlugin { name: &'a str },
}
impl Message<'_> {
#[inline]
pub fn encode(&self) -> Result<Vec<u8>> {
bincode::serialize(self).map_err(anyhow::Error::new)
}
#[inline]
pub fn decode(data: &[u8]) -> Result<Message<'_>> {
bincode::deserialize::<Message>(data).map_err(anyhow::Error::new)
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum MessageReply {
BrokerInfo(BrokerInfo),
NodeInfo(NodeInfo),
NodeHealthStatus(NodeHealthStatus),
StatsInfo(NodeStatus, Box<Stats>),
MetricsInfo(Box<Metrics>),
ClientSearch(Vec<ClientSearchResult>),
ClientGet(Option<ClientSearchResult>),
Subscribe(HashMap<TopicFilter, (bool, Option<String>)>),
Unsubscribe,
GetPlugins(Vec<PluginInfo>),
GetPlugin(Option<PluginInfo>),
GetPluginConfig(Vec<u8>),
ReloadPluginConfig,
LoadPlugin,
UnloadPlugin(bool),
}
impl MessageReply {
#[inline]
pub fn encode(&self) -> Result<Vec<u8>> {
bincode::serialize(self).map_err(anyhow::Error::new)
}
#[inline]
pub fn decode(data: &[u8]) -> Result<MessageReply> {
bincode::deserialize::<MessageReply>(data).map_err(anyhow::Error::new)
}
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct ClientSearchParams {
#[serde(default)]
pub _limit: usize,
pub clientid: Option<String>,
pub username: Option<String>,
pub ip_address: Option<String>,
pub connected: Option<bool>,
pub clean_start: Option<bool>,
pub session_present: Option<bool>,
pub proto_ver: Option<u8>,
pub _like_clientid: Option<String>,
pub _like_username: Option<String>,
#[serde(
default,
deserialize_with = "deserialize_datetime_option",
serialize_with = "serialize_datetime_option"
)]
pub _gte_created_at: Option<Duration>,
#[serde(
default,
deserialize_with = "deserialize_datetime_option",
serialize_with = "serialize_datetime_option"
)]
pub _lte_created_at: Option<Duration>,
#[serde(
default,
deserialize_with = "deserialize_datetime_option",
serialize_with = "serialize_datetime_option"
)]
pub _gte_connected_at: Option<Duration>,
#[serde(
default,
deserialize_with = "deserialize_datetime_option",
serialize_with = "serialize_datetime_option"
)]
pub _lte_connected_at: Option<Duration>,
pub _gte_mqueue_len: Option<usize>,
pub _lte_mqueue_len: Option<usize>, }
#[derive(Deserialize, Serialize, Debug, Default)]
pub struct ClientSearchResult {
pub node_id: NodeId,
pub clientid: ClientId,
pub username: UserName,
pub superuser: bool,
pub proto_ver: u8,
pub ip_address: Option<String>,
pub port: Option<u16>,
pub connected: bool,
pub connected_at: Timestamp,
pub disconnected_at: Timestamp,
pub disconnected_reason: String,
pub keepalive: u16,
pub clean_start: bool,
pub session_present: bool,
pub expiry_interval: i64,
pub created_at: Timestamp,
pub subscriptions_cnt: usize,
pub max_subscriptions: usize,
#[serde(
default,
serialize_with = "ClientSearchResult::serialize_last_will",
deserialize_with = "ClientSearchResult::deserialize_last_will"
)]
pub last_will: serde_json::Value,
pub inflight: usize,
pub max_inflight: u16,
pub mqueue_len: usize,
pub max_mqueue: usize,
}
impl ClientSearchResult {
#[inline]
fn serialize_last_will<S>(last_will: &serde_json::Value, s: S) -> std::result::Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
serde_json::to_vec(last_will).map_err(ser::Error::custom)?.serialize(s)
}
#[inline]
pub fn deserialize_last_will<'de, D>(d: D) -> std::result::Result<serde_json::Value, D::Error>
where
D: de::Deserializer<'de>,
{
serde_json::from_slice(&Vec::deserialize(d)?).map_err(de::Error::custom)
}
#[inline]
pub fn to_json(&self) -> serde_json::Value {
let data = serde_json::json!({
"node_id": self.node_id,
"clientid": self.clientid,
"username": self.username,
"superuser": self.superuser,
"proto_ver": self.proto_ver,
"ip_address": self.ip_address,
"port": self.port,
"connected": self.connected,
"connected_at": format_timestamp(self.connected_at),
"disconnected_at": format_timestamp(self.disconnected_at),
"disconnected_reason": self.disconnected_reason,
"keepalive": self.keepalive,
"clean_start": self.clean_start,
"session_present": self.session_present,
"expiry_interval": self.expiry_interval,
"created_at": format_timestamp(self.created_at),
"subscriptions_cnt": self.subscriptions_cnt,
"max_subscriptions": self.max_subscriptions,
"last_will": self.last_will,
"inflight": self.inflight,
"max_inflight": self.max_inflight,
"mqueue_len": self.mqueue_len,
"max_mqueue": self.max_mqueue,
});
data
}
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct PublishParams {
pub topic: Option<TopicName>,
pub topics: Option<TopicName>,
#[serde(default = "PublishParams::clientid_default")]
pub clientid: ClientId,
pub payload: String,
#[serde(default = "PublishParams::encoding_default")]
pub encoding: String,
#[serde(default = "PublishParams::qos_default")]
pub qos: u8,
#[serde(default = "PublishParams::retain_default")]
pub retain: bool,
pub properties: Option<PublishProperties>,
}
impl PublishParams {
fn clientid_default() -> ClientId {
"system".into()
}
fn encoding_default() -> String {
"plain".into()
}
fn qos_default() -> u8 {
0
}
fn retain_default() -> bool {
false
}
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct SubscribeParams {
pub topic: Option<TopicFilter>,
pub topics: Option<TopicFilter>,
pub clientid: ClientId,
#[serde(default = "SubscribeParams::qos_default")]
pub qos: u8,
}
impl SubscribeParams {
fn qos_default() -> u8 {
0
}
#[inline]
pub fn topics(&self) -> Result<Vec<TopicFilter>> {
let mut topics = if let Some(topics) = &self.topics {
topics.split(',').collect::<Vec<_>>().iter().map(|t| TopicName::from(t.trim())).collect()
} else {
Vec::new()
};
if let Some(topic) = &self.topic {
topics.push(topic.clone());
}
if topics.is_empty() {
return Err(anyhow!("topics or topic is empty"));
}
Ok(topics)
}
#[inline]
pub fn qos(&self) -> Result<QoS> {
QoS::try_from(self.qos).map_err(|e| anyhow!(e))
}
}
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct UnsubscribeParams {
pub topic: TopicFilter,
pub clientid: ClientId,
}
#[derive(Deserialize, Serialize, Debug, Copy, Clone, Hash, Eq, PartialEq)]
pub enum PrometheusDataType {
All,
Node(NodeId),
Sum,
}