use crate::{
client::{BrokerOptions, Client, MqttEvent, TopicEvent, TopicHandlerMap},
Result,
};
use bee_common::packable::Packable;
use bee_message::Message;
use crypto::utils;
use log::warn;
use regex::Regex;
use rumqttc::{
AsyncClient as MqttClient, Event, EventLoop, Incoming, MqttOptions, QoS, Request, Subscribe, SubscribeFilter,
Transport,
};
use tokio::sync::{watch::Sender, RwLock};
use std::{convert::TryFrom, sync::Arc, time::Instant};
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct Topic(String);
impl TryFrom<&str> for Topic {
type Error = crate::Error;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
Self::new(value)
}
}
impl Topic {
pub fn new<S: Into<String>>(name: S) -> Result<Self> {
let mut name: String = name.into();
let indexation_beginning = "messages/indexation/";
if name.len() > indexation_beginning.len()
&& &name[0..indexation_beginning.len()] == indexation_beginning
&& hex::decode(&name[indexation_beginning.len()..name.len()]).is_err()
{
name = format!(
"messages/indexation/{}",
hex::encode(&name[indexation_beginning.len()..name.len()])
);
}
let valid_topics = lazy_static!(
["milestones/latest", "milestones/confirmed", "messages", "messages/referenced"].to_vec() => Vec<&str>
);
let regexes = lazy_static!(
[
Regex::new(r"messages/([A-Fa-f0-9]{64})/metadata").expect("regex failed"),
Regex::new(r"outputs/([A-Fa-f0-9]{64})(\d{4})").expect("regex failed"),
Regex::new("addresses/[\x21-\x7E]{1,30}1[A-Za-z0-9]+/outputs").expect("regex failed"),
Regex::new("addresses/ed25519/([A-Fa-f0-9]{64})/outputs").expect("regex failed"),
Regex::new(r"messages/indexation/([a-f0-9]{2,128})").expect("regex failed"),
Regex::new(r"transactions/([A-Fa-f0-9]{64})/included-message").expect("regex failed"),
].to_vec() => Vec<Regex>
);
if valid_topics.iter().any(|valid| valid == &name) || regexes.iter().any(|re| re.is_match(&name)) {
let topic = Self(name);
Ok(topic)
} else {
Err(crate::Error::InvalidMqttTopic(name))
}
}
}
async fn get_mqtt_client(client: &mut Client) -> Result<&mut MqttClient> {
if *client.mqtt_event_receiver().borrow() == MqttEvent::Disconnected {
client.mqtt_client = None;
}
match client.mqtt_client {
Some(ref mut c) => Ok(c),
None => {
let nodes = if client.node_manager.sync {
#[cfg(not(feature = "wasm"))]
{
client
.node_manager
.synced_nodes
.read()
.map_or(client.node_manager.nodes.clone(), |synced_nodes| synced_nodes.clone())
}
#[cfg(feature = "wasm")]
{
client.node_manager.nodes.clone()
}
} else {
client.node_manager.nodes.clone()
};
for node in nodes.iter() {
let host = node.url.host_str().expect("Can't get host from URL");
let mut entropy = [0u8; 8];
utils::rand::fill(&mut entropy)?;
let id = format!("iotars{}", hex::encode(entropy));
let port = client.broker_options.port;
let mut uri = format!(
"{}://{}:{}/mqtt",
if node.url.scheme() == "https" { "wss" } else { "ws" },
host,
node.url.port_or_known_default().unwrap_or(port)
);
if !client.broker_options.use_ws {
uri = host.to_string();
};
let mut mqtt_options = MqttOptions::new(id, uri, port);
if client.broker_options.use_ws {
mqtt_options.set_transport(Transport::ws());
}
mqtt_options.set_connection_timeout(client.broker_options.timeout.as_secs());
let (_, mut connection) = MqttClient::new(mqtt_options.clone(), 10);
let mut got_ack = false;
while let Ok(event) = connection.poll().await {
if let Event::Incoming(Incoming::ConnAck(_)) = event {
got_ack = true;
break;
}
}
if got_ack {
let (mqtt_client, connection) = MqttClient::new(mqtt_options, 10);
client.mqtt_client.replace(mqtt_client);
poll_mqtt(
client.mqtt_topic_handlers.clone(),
client.broker_options.clone(),
client.mqtt_event_channel.0.clone(),
connection,
);
}
}
client.mqtt_client.as_mut().ok_or(crate::Error::MqttConnectionNotFound)
}
}
}
fn poll_mqtt(
mqtt_topic_handlers_guard: Arc<RwLock<TopicHandlerMap>>,
options: BrokerOptions,
event_sender: Arc<Sender<MqttEvent>>,
mut event_loop: EventLoop,
) {
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
runtime.block_on(async move {
let mut is_subscribed = true;
let mut error_instant = Instant::now();
let mut connection_failure_count = 0;
let handle = event_loop.handle();
loop {
let event = event_loop.poll().await;
let mqtt_topic_handlers_guard = mqtt_topic_handlers_guard.clone();
match event {
Ok(Event::Incoming(Incoming::ConnAck(_))) => {
let _ = event_sender.send(MqttEvent::Connected);
if !is_subscribed {
is_subscribed = true;
let mqtt_topic_handlers = mqtt_topic_handlers_guard.read().await;
let topics = mqtt_topic_handlers
.keys()
.map(|t| SubscribeFilter::new(t.0.clone(), QoS::AtLeastOnce))
.collect::<Vec<SubscribeFilter>>();
if !topics.is_empty() {
let _ = handle.send(Request::Subscribe(Subscribe::new_many(topics))).await;
}
}
}
Ok(Event::Incoming(Incoming::Publish(p))) => {
let topic = p.topic.clone();
crate::async_runtime::spawn(async move {
let mqtt_topic_handlers = mqtt_topic_handlers_guard.read().await;
if let Some(handlers) = mqtt_topic_handlers.get(&Topic(topic.clone())) {
let event = {
if topic.as_str() == "messages"
|| topic.contains("messages/indexation/")
|| topic.contains("transactions/")
{
let mut payload = &*p.payload;
match Message::unpack(&mut payload) {
Ok(iota_message) => match serde_json::to_string(&iota_message) {
Ok(message) => Ok(TopicEvent {
topic,
payload: message,
}),
Err(e) => {
warn!("Parsing iota message failed: {0}", e);
Err(())
}
},
Err(e) => {
warn!("Message unpacking failed: {0}", e);
Err(())
}
}
} else {
Ok(TopicEvent {
topic,
payload: String::from_utf8_lossy(&p.payload).to_string(),
})
}
};
if let Ok(event) = event {
for handler in handlers {
handler(&event)
}
};
}
});
}
Err(_) => {
if error_instant.elapsed().as_secs() < 5 {
connection_failure_count += 1;
} else {
connection_failure_count = 1;
}
if connection_failure_count == options.max_reconnection_attempts {
let _ = event_sender.send(MqttEvent::Disconnected);
break;
}
error_instant = Instant::now();
is_subscribed = false;
}
_ => {}
}
}
});
});
}
pub struct MqttManager<'a> {
client: &'a mut Client,
}
impl<'a> MqttManager<'a> {
pub fn new(client: &'a mut Client) -> Self {
Self { client }
}
pub fn with_topic(self, topic: Topic) -> MqttTopicManager<'a> {
MqttTopicManager::new(self.client).with_topic(topic)
}
pub fn with_topics(self, topics: Vec<Topic>) -> MqttTopicManager<'a> {
MqttTopicManager::new(self.client).with_topics(topics)
}
pub async fn unsubscribe(self) -> crate::Result<()> {
MqttTopicManager::new(self.client).unsubscribe().await
}
pub async fn disconnect(self) -> Result<()> {
if let Some(client) = &self.client.mqtt_client {
client.disconnect().await?;
self.client.mqtt_client = None;
let mqtt_topic_handlers = &self.client.mqtt_topic_handlers;
let mut mqtt_topic_handlers = mqtt_topic_handlers.write().await;
mqtt_topic_handlers.clear()
}
Ok(())
}
}
pub struct MqttTopicManager<'a> {
client: &'a mut Client,
topics: Vec<Topic>,
}
impl<'a> MqttTopicManager<'a> {
fn new(client: &'a mut Client) -> Self {
Self { client, topics: vec![] }
}
pub fn with_topic(mut self, topic: Topic) -> Self {
self.topics.push(topic);
self
}
pub fn with_topics(mut self, topics: Vec<Topic>) -> Self {
self.topics.extend(topics.into_iter());
self
}
pub async fn subscribe<C: Fn(&crate::client::TopicEvent) + Send + Sync + 'static>(self, callback: C) -> Result<()> {
let client = get_mqtt_client(self.client).await?;
let cb = Arc::new(Box::new(callback) as Box<dyn Fn(&crate::client::TopicEvent) + Send + Sync + 'static>);
client
.subscribe_many(
self.topics
.iter()
.map(|t| SubscribeFilter::new(t.0.clone(), QoS::AtLeastOnce))
.collect::<Vec<SubscribeFilter>>(),
)
.await?;
{
let mqtt_topic_handlers = &self.client.mqtt_topic_handlers;
let mut mqtt_topic_handlers = mqtt_topic_handlers.write().await;
for topic in self.topics {
match mqtt_topic_handlers.get_mut(&topic) {
Some(handlers) => handlers.push(cb.clone()),
None => {
mqtt_topic_handlers.insert(topic, vec![cb.clone()]);
}
}
}
}
Ok(())
}
pub async fn unsubscribe(self) -> Result<()> {
let topics = {
let mqtt_topic_handlers = &self.client.mqtt_topic_handlers;
let mqtt_topic_handlers = mqtt_topic_handlers.read().await;
if self.topics.is_empty() {
mqtt_topic_handlers.keys().cloned().collect()
} else {
self.topics
}
};
if let Some(client) = &mut self.client.mqtt_client {
for topic in topics.iter() {
client.unsubscribe(&topic.0).await?;
}
}
let empty_topic_handlers = {
let mqtt_topic_handlers = &self.client.mqtt_topic_handlers;
let mut mqtt_topic_handlers = mqtt_topic_handlers.write().await;
for topic in topics {
mqtt_topic_handlers.remove(&topic);
}
mqtt_topic_handlers.is_empty()
};
if self.client.broker_options.automatic_disconnect && empty_topic_handlers {
MqttManager::new(self.client).disconnect().await?;
}
Ok(())
}
}