use std::sync::Arc;
use std::io::{Error, Result, ErrorKind};
use mqtt311::{QoS, Packet, Publish};
use dashmap::{DashMap,
mapref::entry::Entry};
use tcp::{Socket,
connect::TcpSocket,
tls_connect::TlsSocket};
use ws::{connect::WsSocket,
utils::ChildProtocol};
use quic::{SocketHandle, AsyncService};
use crate::{v311::{self, WsMqtt311},
tls_v311::{self, WssMqtt311},
broker::{MqttBrokerListener, MqttBrokerService},
session::MqttSession,
quic_v311::{self, QuicMqtt311},
quic_broker::{MqttBrokerListener as QuicMqttBrokerListener, MqttBrokerService as QuicMqttBrokerService},
quic_session::MqttSession as QuicMqttSession};
lazy_static! {
static ref MQTT_BROKERS: DashMap<String, MqttBrokerProtocol> = DashMap::default();
static ref MQTT_BROKERS_MAP: DashMap<u16, String> = DashMap::default();
}
lazy_static! {
static ref QUIC_MQTT_BROKERS: DashMap<String, MqttBrokerProtocol> = DashMap::default();
static ref QUIC_MQTT_BROKERS_MAP: DashMap<u16, String> = DashMap::default();
}
pub fn get_broker_name(port: u16) -> Option<String> {
if let Some(item) = MQTT_BROKERS_MAP.get(&port) {
Some(item.value().clone())
} else {
None
}
}
pub fn get_quic_broker_name(port: u16) -> Option<String> {
if let Some(item) = QUIC_MQTT_BROKERS_MAP.get(&port) {
Some(item.value().clone())
} else {
None
}
}
pub fn get_broker(broker_name: &String) -> Option<MqttBrokerProtocol> {
if let Some(item) = MQTT_BROKERS.get(broker_name) {
Some(item.value().clone())
} else {
None
}
}
pub fn all_broker() -> Vec<(String, MqttBrokerProtocol)> {
let mut brokers = Vec::with_capacity(MQTT_BROKERS.len());
for broker in MQTT_BROKERS.iter() {
brokers.push((broker.key().clone(),
broker.value().clone()));
}
brokers
}
pub fn get_quic_broker(broker_name: &String) -> Option<MqttBrokerProtocol> {
if let Some(item) = QUIC_MQTT_BROKERS.get(broker_name) {
Some(item.value().clone())
} else {
None
}
}
pub fn all_quic_broker() -> Vec<(String, MqttBrokerProtocol)> {
let mut brokers = Vec::with_capacity(MQTT_BROKERS.len());
for broker in QUIC_MQTT_BROKERS.iter() {
brokers.push((broker.key().clone(),
broker.value().clone()));
}
brokers
}
pub fn register_mqtt_listener(name: &str,
listener: Arc<dyn MqttBrokerListener<TcpSocket>>) -> bool {
if let Some(item) = MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::WsMqtt311(broker) => {
broker
.get_broker()
.register_listener(listener);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
pub fn register_mqtts_listener(name: &str,
listener: Arc<dyn MqttBrokerListener<TlsSocket>>) -> bool {
if let Some(item) = MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::WssMqtt311(broker) => {
broker
.get_broker()
.register_listener(listener);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
pub fn register_quic_mqtt_listener(name: &str,
listener: Arc<dyn QuicMqttBrokerListener>) -> bool {
if let Some(item) = QUIC_MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::QuicMqtt311(broker) => {
broker
.get_broker()
.register_listener(listener);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
pub fn register_mqtt_service(name: &str,
service: Arc<dyn MqttBrokerService<TcpSocket>>) -> bool {
if let Some(item) = MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::WsMqtt311(broker) => {
broker
.get_broker()
.register_service(service);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
pub fn register_mqtts_service(name: &str,
service: Arc<dyn MqttBrokerService<TlsSocket>>) -> bool {
if let Some(item) = MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::WssMqtt311(broker) => {
broker
.get_broker()
.register_service(service);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
pub fn register_quic_mqtt_service(name: &str,
service: Arc<dyn QuicMqttBrokerService>) -> bool {
if let Some(item) = QUIC_MQTT_BROKERS.get(&name.to_string()) {
match item.value() {
MqttBrokerProtocol::QuicMqtt311(broker) => {
broker
.get_broker()
.register_service(service);
return true;
},
_ => {
unimplemented!();
},
}
}
false
}
#[derive(Clone)]
pub enum MqttBrokerProtocol {
WsMqtt311(Arc<WsMqtt311>), WssMqtt311(Arc<WssMqtt311>), QuicMqtt311(Arc<QuicMqtt311>), }
impl MqttBrokerProtocol {
pub fn get_broker_name(&self) -> &str {
match self {
MqttBrokerProtocol::WsMqtt311(broker) => broker.get_broker_name(),
MqttBrokerProtocol::WssMqtt311(broker) => broker.get_broker_name(),
MqttBrokerProtocol::QuicMqtt311(borker) => borker.get_broker_name(),
}
}
}
pub struct WsMqttBrokerFactory {
protocol_name: String, broker_name: String, broker_port: u16, }
impl WsMqttBrokerFactory {
pub fn new(protocol_name: &str,
broker_name: &str,
broker_port: u16,
is_strict: bool) -> Self {
let broker = Arc::new(WsMqtt311::with_name(protocol_name,
broker_name,
WsMqtt311::MAX_QOS,
is_strict));
MQTT_BROKERS
.insert(broker_name.to_string(),
MqttBrokerProtocol::WsMqtt311(broker));
MQTT_BROKERS_MAP
.insert(broker_port,
broker_name.to_string());
WsMqttBrokerFactory {
protocol_name: protocol_name.to_string(),
broker_name: broker_name.to_string(),
broker_port,
}
}
pub fn new_child_protocol(&self, is_strict: bool) -> Arc<dyn ChildProtocol<TcpSocket>> {
if let Some(item) = MQTT_BROKERS.get(&self.broker_name) {
if let MqttBrokerProtocol::WsMqtt311(broker) = item.value() {
return broker.clone();
}
}
let broker = Arc::new(
WsMqtt311::with_name(&self.protocol_name,
&self.broker_name,
WsMqtt311::MAX_QOS,
is_strict)
);
MQTT_BROKERS
.insert(self.broker_name.clone(),
MqttBrokerProtocol::WsMqtt311(broker.clone()));
MQTT_BROKERS_MAP
.insert(self.broker_port,
self.broker_name.clone());
broker
}
}
pub struct WssMqttBrokerFactory {
protocol_name: String, broker_name: String, broker_port: u16, }
impl WssMqttBrokerFactory {
pub fn new(protocol_name: &str,
broker_name: &str,
broker_port: u16,
is_strict: bool) -> Self {
let broker = Arc::new(
WssMqtt311::with_name(protocol_name,
broker_name,
WsMqtt311::MAX_QOS,
is_strict)
);
MQTT_BROKERS
.insert(broker_name.to_string(),
MqttBrokerProtocol::WssMqtt311(broker));
MQTT_BROKERS_MAP
.insert(broker_port, broker_name.to_string());
WssMqttBrokerFactory {
protocol_name: protocol_name.to_string(),
broker_name: broker_name.to_string(),
broker_port,
}
}
pub fn new_child_protocol(&self, is_strict: bool) -> Arc<dyn ChildProtocol<TlsSocket>> {
if let Some(item) = MQTT_BROKERS.get(&self.broker_name) {
if let MqttBrokerProtocol::WssMqtt311(broker) = item.value() {
return broker.clone();
}
}
let broker = Arc::new(WssMqtt311::with_name(&self.protocol_name, &self.broker_name, WsMqtt311::MAX_QOS, is_strict));
MQTT_BROKERS
.insert(self.broker_name.clone(),
MqttBrokerProtocol::WssMqtt311(broker.clone()));
MQTT_BROKERS_MAP
.insert(self.broker_port,
self.broker_name.clone());
broker
}
}
pub struct QuicMqttBrokerFactory {
broker_name: String, broker_port: u16, }
impl QuicMqttBrokerFactory {
pub fn new(broker_name: &str,
broker_port: u16) -> Self {
let broker = Arc::new(QuicMqtt311::with_name(broker_name,
QuicMqtt311::MAX_QOS));
QUIC_MQTT_BROKERS
.insert(broker_name.to_string(),
MqttBrokerProtocol::QuicMqtt311(broker));
QUIC_MQTT_BROKERS_MAP
.insert(broker_port,
broker_name.to_string());
QuicMqttBrokerFactory {
broker_name: broker_name.to_string(),
broker_port,
}
}
pub fn new_quic_service(&self) -> Arc<dyn AsyncService> {
if let Some(item) = QUIC_MQTT_BROKERS.get(&self.broker_name) {
if let MqttBrokerProtocol::QuicMqtt311(broker) = item.value() {
return broker.clone();
}
}
let broker = Arc::new(
QuicMqtt311::with_name(&self.broker_name,
QuicMqtt311::MAX_QOS)
);
QUIC_MQTT_BROKERS
.insert(self.broker_name.clone(),
MqttBrokerProtocol::QuicMqtt311(broker.clone()));
QUIC_MQTT_BROKERS_MAP
.insert(self.broker_port,
self.broker_name.clone());
broker
}
}
pub fn add_topic(broker_name: &String,
is_public: bool,
topic: String,
qos: u8,
retain: Option<Publish>) {
let broker = if let Some(broker) = get_broker(broker_name) {
broker
} else {
return;
};
match broker {
MqttBrokerProtocol::WsMqtt311(broker) => {
broker.get_broker().subscribed(is_public, &topic, qos, retain);
},
MqttBrokerProtocol::WssMqtt311(broker) => {
broker.get_broker().subscribed(is_public, &topic, qos, retain);
},
MqttBrokerProtocol::QuicMqtt311(broker) => {
broker.get_broker().subscribed(is_public, &topic, qos, retain);
},
}
}
pub fn publish_topic(broker_name: Option<String>,
is_public: bool,
topic: String,
qos: u8,
retain: Option<Publish>,
payload: Arc<Vec<u8>>) -> Result<()> {
if let Some(broker_name) = broker_name {
if let Some(broker) = get_broker(&broker_name) {
publish_to_connection(&broker,
&broker_name,
is_public,
&topic,
qos,
&retain,
&payload)
} else {
Err(Error::new(ErrorKind::Other,
format!("Mqtt broker broadcast failed, broker: {:?}, reason: broker not exist",
broker_name)))
}
} else {
for (broker_name, broker) in all_broker() {
publish_to_connection(&broker,
&broker_name,
is_public,
&topic,
qos,
&retain,
&payload)?;
}
Ok(())
}
}
fn publish_to_connection(broker: &MqttBrokerProtocol,
broker_name: &String,
is_public: bool,
topic: &String,
qos: u8,
retain: &Option<Publish>,
payload: &Arc<Vec<u8>>) -> Result<()> {
match broker {
MqttBrokerProtocol::WsMqtt311(broker) => {
if let Some(sessions) = broker.get_broker().subscribed(is_public, &topic, qos, retain.clone()) {
let mut connects: Vec<WsSocket<TcpSocket>> = Vec::with_capacity(sessions.len());
for session in sessions {
if let Some(connect) = session.get_connect() {
connects.push(connect.clone());
}
};
let packet = Packet::Publish(Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic_name: topic.clone(),
pkid: None,
payload: payload.clone(),
});
if let Err(e) = v311::broadcast_packet(&connects[..], &packet) {
return Err(Error::new(ErrorKind::BrokenPipe,
format!("Mqtt broker broadcast failed, broker: {:?}, reason: {:?}",
broker_name,
e)));
}
}
Ok(())
},
MqttBrokerProtocol::WssMqtt311(broker) => {
if let Some(sessions) = broker.get_broker().subscribed(is_public, &topic, qos, retain.clone()) {
let mut connects: Vec<WsSocket<TlsSocket>> = Vec::with_capacity(sessions.len());
for session in sessions {
if let Some(connect) = session.get_connect() {
connects.push(connect.clone());
}
};
let packet = Packet::Publish(Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic_name: topic.clone(),
pkid: None,
payload: payload.clone(),
});
if let Err(e) = tls_v311::broadcast_packet(&connects[..], &packet) {
return Err(Error::new(ErrorKind::BrokenPipe,
format!("Mqtt broker broadcast failed, broker: {:?}, reason: {:?}",
broker_name,
e)));
}
}
Ok(())
},
_ => Ok(()),
}
}
pub fn add_quic_topic(broker_name: &String,
is_public: bool,
topic: String,
qos: u8,
retain: Option<Publish>) {
let broker = if let Some(MqttBrokerProtocol::QuicMqtt311(broker)) = get_quic_broker(broker_name) {
broker
} else {
return;
};
broker.get_broker().subscribed(is_public, &topic, qos, retain);
}
pub fn publish_quic_topic(broker_name: Option<String>,
is_public: bool,
topic: String,
qos: u8,
retain: Option<Publish>,
payload: Arc<Vec<u8>>) -> Result<()> {
if let Some(broker_name) = broker_name {
if let Some(MqttBrokerProtocol::QuicMqtt311(broker)) = get_quic_broker(&broker_name) {
publish_to_quic_connection(&broker,
&broker_name,
is_public,
&topic,
qos,
&retain,
&payload)
} else {
Err(Error::new(ErrorKind::Other,
format!("Mqtt quic broker broadcast failed, broker: {:?}, topic: {:?}, reason: broker not exist",
topic,
broker_name)))
}
} else {
for (broker_name, broker) in all_quic_broker() {
if let MqttBrokerProtocol::QuicMqtt311(broker) = broker {
publish_to_quic_connection(&broker,
&broker_name,
is_public,
&topic,
qos,
&retain,
&payload)?;
}
}
Ok(())
}
}
fn publish_to_quic_connection(broker: &Arc<QuicMqtt311>,
broker_name: &String,
is_public: bool,
topic: &String,
qos: u8,
retain: &Option<Publish>,
payload: &Arc<Vec<u8>>) -> Result<()> {
if let Some(sessions) = broker.get_broker().subscribed(is_public, &topic, qos, retain.clone()) {
let mut connects: Vec<SocketHandle> = Vec::with_capacity(sessions.len());
for session in sessions {
if let Some(connect) = session.get_connect() {
connects.push(connect.clone());
}
};
let packet = Packet::Publish(Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic_name: topic.clone(),
pkid: None,
payload: payload.clone(),
});
if let Err(e) = quic_v311::broadcast_packet(&connects[..], &packet) {
Err(Error::new(ErrorKind::BrokenPipe,
format!("Mqtt quic broker broadcast failed, broker: {:?}, topic: {:?}, reason: {:?}",
topic,
broker_name,
e)))
} else {
Ok(())
}
} else {
Err(Error::new(ErrorKind::Other,
format!("Mqtt quic broker broadcast failed, broker: {:?}, topic: {:?}, reason: the specified topic is not subscribed",
topic,
broker_name)))
}
}